Skip to content

Commit

Permalink
Wait for stdout pipe to close before calling runner.Wait() (#299)
Browse files Browse the repository at this point in the history
If the two goroutines are left to race each other, when runner.Wait() wins it will close the file and cause the stdout scanner to log a spurious os.ErrClosed error instead of returning nil after encountering an io.EOF error.
  • Loading branch information
gabivlj authored Feb 12, 2024
1 parent 586d14f commit 90c365e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
18 changes: 10 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ type Client struct {
// goroutines.
clientWaitGroup sync.WaitGroup

// stderrWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stderr pipe.
stderrWaitGroup sync.WaitGroup
// pipesWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stdout and stderr pipe.
pipesWaitGroup sync.WaitGroup

// processKilled is used for testing only, to flag when the process was
// forcefully killed.
Expand Down Expand Up @@ -756,8 +756,8 @@ func (c *Client) Start() (addr net.Addr, err error) {

// Start goroutine that logs the stderr
c.clientWaitGroup.Add(1)
c.stderrWaitGroup.Add(1)
// logStderr calls Done()
c.pipesWaitGroup.Add(1)
// logStderr calls c.pipesWaitGroup.Done()
go c.logStderr(runner.Name(), runner.Stderr())

c.clientWaitGroup.Add(1)
Expand All @@ -767,9 +767,9 @@ func (c *Client) Start() (addr net.Addr, err error) {

defer c.clientWaitGroup.Done()

// wait to finish reading from stderr since the stderr pipe reader
// wait to finish reading from stdout/stderr since the stdout/stderr pipe readers
// will be closed by the subsequent call to cmd.Wait().
c.stderrWaitGroup.Wait()
c.pipesWaitGroup.Wait()

// Wait for the command to end.
err := runner.Wait(context.Background())
Expand All @@ -792,8 +792,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
// out of stdout
linesCh := make(chan string)
c.clientWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
go func() {
defer c.clientWaitGroup.Done()
defer c.pipesWaitGroup.Done()
defer close(linesCh)

scanner := bufio.NewScanner(runner.Stdout())
Expand Down Expand Up @@ -1159,7 +1161,7 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {

func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
defer c.pipesWaitGroup.Done()
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize)
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ this line is short

reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
read := stderr.String()

Expand Down Expand Up @@ -1531,7 +1531,7 @@ func TestClient_logStderrParseJSON(t *testing.T) {
{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}`
reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n")

Expand Down

0 comments on commit 90c365e

Please sign in to comment.