Commit 55bfb59b authored by Cezar Sá Espinola's avatar Cezar Sá Espinola Committed by Joffrey F

plugin/localbinary: Exit output stream goroutines when plugin closes

This commit ensures that when a plugin instance is closed the goroutines
responsible for streaming stdout and stderr of the called binary will
also exit, preventing a goroutines leak.

Before this commit these goroutines could stay blocked forever if
Close() was called while the binary still had some pending output.

I found this bug after debugging a real world goroutine leak, the
goroutines dump would show thousands of goroutines at:

```, 0xc0000aad20)
	/home/travis/gopath/src/ +0x7c
created by*Plugin).AttachStream
	/home/travis/gopath/src/ +0x67
Signed-off-by: Cezar Sá Espinola's avatarCezar Sa Espinola <[email protected]>
parent 5a8ce1ae
......@@ -72,7 +72,7 @@ type Plugin struct {
Addr string
MachineName string
addrCh chan string
stopCh chan bool
stopCh chan struct{}
timeout time.Duration
......@@ -121,7 +121,7 @@ func NewPlugin(driverName string) (*Plugin, error) {
log.Debugf("Found binary path at %s", binaryPath)
return &Plugin{
stopCh: make(chan bool),
stopCh: make(chan struct{}),
addrCh: make(chan string, 1),
Executor: &Executor{
DriverName: driverName,
......@@ -168,19 +168,23 @@ func (lbe *Executor) Close() error {
return nil
func stream(scanner *bufio.Scanner, streamOutCh chan<- string) {
func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan struct{}) {
for scanner.Scan() {
line := scanner.Text()
if err := scanner.Err(); err != nil {
log.Warnf("Scanning stream: %s", err)
streamOutCh <- strings.Trim(line, "\n")
select {
case streamOutCh <- strings.Trim(line, "\n"):
case <-stopCh:
func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) <-chan string {
streamOutCh := make(chan string)
go stream(scanner, streamOutCh)
go stream(scanner, streamOutCh, lbp.stopCh)
return streamOutCh
......@@ -241,6 +245,6 @@ func (lbp *Plugin) Address() (string, error) {
func (lbp *Plugin) Close() error {
lbp.stopCh <- true
return nil
......@@ -71,10 +71,10 @@ func TestLocalBinaryPluginAddressTimeout(t *testing.T) {
func TestLocalBinaryPluginClose(t *testing.T) {
lbp := &Plugin{}
lbp.stopCh = make(chan bool, 1)
lbp.stopCh = make(chan struct{})
go lbp.Close()
stopped := <-lbp.stopCh
if !stopped {
_, isOpen := <-lbp.stopCh
if isOpen {
t.Fatal("Close did not send a stop message on the proper channel")
......@@ -106,7 +106,7 @@ func TestExecServer(t *testing.T) {
MachineName: machineName,
Executor: fe,
addrCh: make(chan string, 1),
stopCh: make(chan bool, 1),
stopCh: make(chan struct{}),
finalErr := make(chan error)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment