Skip to content

Commit 0b7aefd

Browse files
committed
[FAB-13239] terminate container streaming output loop
When vm.docker.attachStdout is enabled, output from chaincode containers is written to the peer log. A recent change went in that broke the error handling behavior and resulted in a tight loop of reading from a closed reader and issuing an error message. This change fixes the error handling and back-fills test. Change-Id: Icda853dba90b873f2fbddde990c5f61f7834f1c9 Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent e485f77 commit 0b7aefd

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

core/container/dockercontroller/dockercontroller.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload ma
264264

265265
// stream stdout and stderr to chaincode logger
266266
if attachStdout {
267-
go vm.streamOutput(client, containerName)
267+
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
268+
streamOutput(dockerLogger, client, containerName, containerLogger)
268269
}
269270

270271
// upload specified files to the container before starting it
@@ -309,7 +310,7 @@ func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload ma
309310
}
310311

311312
// streamOutput mirrors output from the named container to a fabric logger.
312-
func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
313+
func streamOutput(logger *flogging.FabricLogger, client dockerClient, containerName string, containerLogger *flogging.FabricLogger) {
313314
// Launch a few go routines to manage output streams from the container.
314315
// They will be automatically destroyed when the container exits
315316
attached := make(chan struct{})
@@ -337,19 +338,18 @@ func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
337338
}()
338339

339340
go func() {
341+
defer r.Close() // ensure the pipe reader gets closed
342+
340343
// Block here until the attachment completes or we timeout
341344
select {
342345
case <-attached: // successful attach
343346
close(attached) // close indicates the streams can now be copied
344347

345348
case <-time.After(10 * time.Second):
346-
dockerLogger.Errorf("Timeout while attaching to IO channel in container %s", containerName)
349+
logger.Errorf("Timeout while attaching to IO channel in container %s", containerName)
347350
return
348351
}
349352

350-
// create a logger for the chaincode
351-
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
352-
353353
is := bufio.NewReader(r)
354354
for {
355355
// Loop forever dumping lines of text into the containerLogger
@@ -359,9 +359,11 @@ func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
359359
case nil:
360360
containerLogger.Info(line)
361361
case io.EOF:
362-
dockerLogger.Infof("Container %s has closed its IO channel", containerName)
362+
logger.Infof("Container %s has closed its IO channel", containerName)
363+
return
363364
default:
364-
dockerLogger.Errorf("Error reading container output: %s", err)
365+
logger.Errorf("Error reading container output: %s", err)
366+
return
365367
}
366368
}
367369
}()

core/container/dockercontroller/dockercontroller_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
docker "github.com/fsouza/go-dockerclient"
22+
"github.com/hyperledger/fabric/common/flogging/floggingtest"
2223
"github.com/hyperledger/fabric/common/metrics/disabled"
2324
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
2425
"github.com/hyperledger/fabric/common/util"
@@ -28,6 +29,7 @@ import (
2829
coreutil "github.com/hyperledger/fabric/core/testutil"
2930
pb "github.com/hyperledger/fabric/protos/peer"
3031
. "github.com/onsi/gomega"
32+
"github.com/onsi/gomega/gbytes"
3133
"github.com/spf13/viper"
3234
"github.com/stretchr/testify/assert"
3335
"github.com/stretchr/testify/require"
@@ -179,6 +181,38 @@ func Test_Start(t *testing.T) {
179181
gt.Expect(err).NotTo(HaveOccurred())
180182
}
181183

184+
func Test_streamOutput(t *testing.T) {
185+
gt := NewGomegaWithT(t)
186+
187+
logger, recorder := floggingtest.NewTestLogger(t)
188+
containerLogger, containerRecorder := floggingtest.NewTestLogger(t)
189+
190+
client := &mockClient{}
191+
errCh := make(chan error, 1)
192+
optsCh := make(chan docker.AttachToContainerOptions, 1)
193+
client.attachToContainerStub = func(opts docker.AttachToContainerOptions) error {
194+
optsCh <- opts
195+
return <-errCh
196+
}
197+
198+
streamOutput(logger, client, "container-name", containerLogger)
199+
200+
var opts docker.AttachToContainerOptions
201+
gt.Eventually(optsCh).Should(Receive(&opts))
202+
gt.Eventually(opts.Success).Should(BeSent(struct{}{}))
203+
gt.Eventually(opts.Success).Should(BeClosed())
204+
205+
fmt.Fprintf(opts.OutputStream, "message-one\n")
206+
fmt.Fprintf(opts.OutputStream, "message-two") // does not get written
207+
gt.Eventually(containerRecorder).Should(gbytes.Say("message-one"))
208+
gt.Consistently(containerRecorder.Entries).Should(HaveLen(1))
209+
210+
close(errCh)
211+
gt.Eventually(recorder).Should(gbytes.Say("Container container-name has closed its IO channel"))
212+
gt.Consistently(recorder.Entries).Should(HaveLen(1))
213+
gt.Consistently(containerRecorder.Entries).Should(HaveLen(1))
214+
}
215+
182216
func Test_BuildMetric(t *testing.T) {
183217
ccid := ccintf.CCID{Name: "simple", Version: "1.0"}
184218
client := &mockClient{}
@@ -373,6 +407,8 @@ func (m *mockBuilder) Build() (io.Reader, error) {
373407
type mockClient struct {
374408
noSuchImgErrReturned bool
375409
pingErr bool
410+
411+
attachToContainerStub func(docker.AttachToContainerOptions) error
376412
}
377413

378414
var getClientErr, createErr, uploadErr, noSuchImgErr, buildErr, removeImgErr,
@@ -404,6 +440,9 @@ func (c *mockClient) UploadToContainer(id string, opts docker.UploadToContainerO
404440
}
405441

406442
func (c *mockClient) AttachToContainer(opts docker.AttachToContainerOptions) error {
443+
if c.attachToContainerStub != nil {
444+
return c.attachToContainerStub(opts)
445+
}
407446
if opts.Success != nil {
408447
opts.Success <- struct{}{}
409448
}

0 commit comments

Comments
 (0)