Skip to content

Commit

Permalink
use StdoutPipe and StderrPipe
Browse files Browse the repository at this point in the history
This uses proper os.File file descriptors for stdin and stdout rather
than io.Pipe, removing extra copy goroutines from each, and
automatically closes the fds when necessary.

Don't scan with slices from stdout, as they are handled concurrently and
the backing array may be re-used.
  • Loading branch information
jbardin committed Dec 11, 2018
1 parent 5de8659 commit aae778a
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,20 @@ func (c *Client) Start() (addr net.Addr, err error) {
fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
}

stdout_r, stdout_w := io.Pipe()
stderr_r, stderr_w := io.Pipe()

cmd := c.config.Cmd
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Stdin = os.Stdin
cmd.Stderr = stderr_w
cmd.Stdout = stdout_w

// these cannot return errors at this point
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}

if c.config.SecureConfig != nil {
if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
Expand Down Expand Up @@ -617,20 +622,20 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Start goroutine to wait for process to exit
exitCh := make(chan struct{})
go func() {
// Make sure we close the write end of our stderr/stdout so
// that the readers send EOF properly.
defer stderr_w.Close()
defer stdout_w.Close()

// ensure the context is cancelled when we're done
defer ctxCancel()

// get the cmd info early, since the process information will be removed
// in Kill.
pid := c.process.Pid
path := cmd.Path

// Wait for the command to end.
err := cmd.Wait()

debugMsgArgs := []interface{}{
"path", cmd.Path,
"pid", c.process.Pid,
"path", path,
"pid", pid,
}
if err != nil {
debugMsgArgs = append(debugMsgArgs,
Expand All @@ -651,32 +656,25 @@ func (c *Client) Start() (addr net.Addr, err error) {
}()

// Start goroutine that logs the stderr
go c.logStderr(stderr_r)
go c.logStderr(cmdStderr)

// Start a goroutine that is going to be reading the lines
// out of stdout
linesCh := make(chan []byte)
linesCh := make(chan string)
go func() {
defer close(linesCh)

buf := bufio.NewReader(stdout_r)
for {
line, err := buf.ReadBytes('\n')
if line != nil {
linesCh <- line
}

if err == io.EOF {
return
}
scanner := bufio.NewScanner(cmdStdout)
for scanner.Scan() {
linesCh <- scanner.Text()
}
}()

// Make sure after we exit we read the lines from stdout forever
// so they don't block since it is an io.Pipe
// so they don't block since it is a pipe.
defer func() {
go func() {
for _ = range linesCh {
for range linesCh {
}
}()
}()
Expand All @@ -691,10 +689,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
err = errors.New("timeout while waiting for plugin to start")
case <-exitCh:
err = errors.New("plugin exited before we could connect")
case lineBytes := <-linesCh:
case line := <-linesCh:
// Trim the line and split by "|" in order to get the parts of
// the output.
line := strings.TrimSpace(string(lineBytes))
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "|", 6)
if len(parts) < 4 {
err = fmt.Errorf(
Expand Down

0 comments on commit aae778a

Please sign in to comment.