diff --git a/api/server/router/container/container_routes.go b/api/server/router/container/container_routes.go index 6823a1a223182..2852623fe94dc 100644 --- a/api/server/router/container/container_routes.go +++ b/api/server/router/container/container_routes.go @@ -18,6 +18,7 @@ import ( "github.com/docker/docker/api/types/versions" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/signal" + "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" "golang.org/x/net/websocket" ) @@ -108,9 +109,10 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response select { case <-chStarted: // The client may be expecting all of the data we're sending to - // be multiplexed, so send it through OutStream, which will - // have been set up to handle that if needed. - fmt.Fprintf(logsConfig.OutStream, "Error running logs job: %v\n", err) + // be multiplexed, so mux it through the Systemerr stream, which + // will cause the client to throw an error when demuxing + stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr) + fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err) default: return err } diff --git a/api/server/router/swarm/cluster_routes.go b/api/server/router/swarm/cluster_routes.go index 59420fe90583e..076d4a4452e6d 100644 --- a/api/server/router/swarm/cluster_routes.go +++ b/api/server/router/swarm/cluster_routes.go @@ -13,6 +13,7 @@ import ( "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/filters" types "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" ) @@ -252,7 +253,8 @@ func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter // The client may be expecting all of the data we're sending to // be multiplexed, so send it through OutStream, which will // have been set up to handle that if needed. - fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err) + stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr) + fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err) default: return err } diff --git a/pkg/stdcopy/stdcopy.go b/pkg/stdcopy/stdcopy.go index be20765457401..a018a203f3865 100644 --- a/pkg/stdcopy/stdcopy.go +++ b/pkg/stdcopy/stdcopy.go @@ -20,6 +20,9 @@ const ( Stdout // Stderr represents standard error steam type. Stderr + // Systemerr represents errors originating from the system that make it + // into the the multiplexed stream. + Systemerr stdWriterPrefixLen = 8 stdWriterFdIndex = 0 @@ -115,8 +118,9 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) } } + stream := StdType(buf[stdWriterFdIndex]) // Check the first byte to know where to write - switch StdType(buf[stdWriterFdIndex]) { + switch stream { case Stdin: fallthrough case Stdout: @@ -125,6 +129,11 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) case Stderr: // Write on stderr out = dsterr + case Systemerr: + // If we're on Systemerr, we won't write anywhere. + // NB: if this code changes later, make sure you don't try to write + // to outstream if Systemerr is the stream + out = nil default: return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) } @@ -155,11 +164,18 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) } } + // we might have an error from the source mixed up in our multiplexed + // stream. if we do, return it. + if stream == Systemerr { + return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) + } + // Write the retrieved frame (without header) nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) if ew != nil { return 0, ew } + // If the frame has not been fully written: error if nw != frameSize { return 0, io.ErrShortWrite diff --git a/pkg/stdcopy/stdcopy_test.go b/pkg/stdcopy/stdcopy_test.go index 3137a75239892..b3e2c4dfd8ee1 100644 --- a/pkg/stdcopy/stdcopy_test.go +++ b/pkg/stdcopy/stdcopy_test.go @@ -246,6 +246,35 @@ func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) { } } +// TestStdCopyReturnsErrorFromSystem tests that StdCopy correctly returns an +// error, when that error is muxed into the Systemerr stream. +func TestStdCopyReturnsErrorFromSystem(t *testing.T) { + // write in the basic messages, just so there's some fluff in there + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + // add in an error message on the Systemerr stream + systemErrBytes := []byte(strings.Repeat("S", startingBufLen)) + systemWriter := NewStdWriter(buffer, Systemerr) + _, err = systemWriter.Write(systemErrBytes) + if err != nil { + t.Fatal(err) + } + + // now copy and demux. we should expect an error containing the string we + // wrote out + _, err = StdCopy(ioutil.Discard, ioutil.Discard, buffer) + if err == nil { + t.Fatal("expected error, got none") + } + if !strings.Contains(err.Error(), string(systemErrBytes)) { + t.Fatal("expected error to contain message") + } +} + func BenchmarkWrite(b *testing.B) { w := NewStdWriter(ioutil.Discard, Stdout) data := []byte("Test line for testing stdwriter performance\n")