Skip to content

Commit

Permalink
fix: logging restart (#2697)
Browse files Browse the repository at this point in the history
Fix being able to stop and restart a container by moving the stop
processing from PreTerminates to PostStops hooks.

Fix log consumer being duplicated when restarting.

Fix race condition in log consumer test, cleaning up to flow to validate
correctly.
  • Loading branch information
stevenh authored Aug 16, 2024
1 parent 5024e26 commit 17b178f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
19 changes: 9 additions & 10 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,26 +165,25 @@ var defaultCopyFileToContainerHook = func(files []ContainerFile) ContainerLifecy
var defaultLogConsumersHook = func(cfg *LogConsumerConfig) ContainerLifecycleHooks {
return ContainerLifecycleHooks{
PostStarts: []ContainerHook{
// first post-start hook is to produce logs and start log consumers
// Produce logs sending details to the log consumers.
// See combineContainerHooks for the order of execution.
func(ctx context.Context, c Container) error {
dockerContainer := c.(*DockerContainer)

if cfg == nil {
if cfg == nil || len(cfg.Consumers) == 0 {
return nil
}

dockerContainer := c.(*DockerContainer)
dockerContainer.consumers = dockerContainer.consumers[:0]
for _, consumer := range cfg.Consumers {
dockerContainer.followOutput(consumer)
}

if len(cfg.Consumers) > 0 {
return dockerContainer.startLogProduction(ctx, cfg.Opts...)
}
return nil
return dockerContainer.startLogProduction(ctx, cfg.Opts...)
},
},
PreTerminates: []ContainerHook{
// first pre-terminate hook is to stop the log production
PostStops: []ContainerHook{
// Stop the log production.
// See combineContainerHooks for the order of execution.
func(ctx context.Context, c Container) error {
if cfg == nil || len(cfg.Consumers) == 0 {
return nil
Expand Down
74 changes: 52 additions & 22 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,23 +639,54 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) {
assert.False(t, strings.Contains(actual, logStoppedForOutOfSyncMessage))
}

// FooLogConsumer is a test log consumer that accepts logs from the
// "hello-world" Docker image, which prints out the "Hello from Docker!"
// log message.
type FooLogConsumer struct {
LogChannel chan string
t *testing.T
}

// Accept receives a log message and sends it to the log channel if it
// contains the "Hello from Docker!" message.
func (c FooLogConsumer) Accept(rawLog Log) {
log := string(rawLog.Content)
c.LogChannel <- log
if strings.Contains(log, "Hello from Docker!") {
select {
case c.LogChannel <- log:
default:
}
}
}

// AssertRead waits for a log message to be received.
func (c FooLogConsumer) AssertRead() {
select {
case <-c.LogChannel:
case <-time.After(5 * time.Second):
c.t.Fatal("receive timeout")
}
}

// SlurpOne reads a value from the channel if it is available.
func (c FooLogConsumer) SlurpOne() {
select {
case <-c.LogChannel:
default:
}
}

func NewFooLogConsumer() *FooLogConsumer {
func NewFooLogConsumer(t *testing.T) *FooLogConsumer {
t.Helper()

return &FooLogConsumer{
LogChannel: make(chan string),
t: t,
LogChannel: make(chan string, 2),
}
}

func TestRestartContainerWithLogConsumer(t *testing.T) {
logConsumer := NewFooLogConsumer()
logConsumer := NewFooLogConsumer(t)

ctx := context.Background()
container, err := GenericContainer(ctx, GenericContainerRequest{
Expand All @@ -668,28 +699,27 @@ func TestRestartContainerWithLogConsumer(t *testing.T) {
},
Started: false,
})
if err != nil {
t.Fatalf("Cant create container: %s", err.Error())
}
terminateContainerOnEnd(t, ctx, container)
require.NoError(t, err)

// Start and confirm that the log consumer receives the log message.
err = container.Start(ctx)
if err != nil {
t.Fatalf("Cant start container: %s", err.Error())
}
require.NoError(t, err)

logConsumer.AssertRead()

d := 30 * time.Second
// Stop the container and clear any pending message.
d := 5 * time.Second
err = container.Stop(ctx, &d)
if err != nil {
t.Fatalf("Cant stop container: %s", err.Error())
}
require.NoError(t, err)

logConsumer.SlurpOne()

// Restart the container and confirm that the log consumer receives new log messages.
err = container.Start(ctx)
if err != nil {
t.Fatalf("Cant start container: %s", err.Error())
}
require.NoError(t, err)

for s := range logConsumer.LogChannel {
if strings.Contains(s, "Hello from Docker!") {
break
}
}
// First message is from the first start.
logConsumer.AssertRead()
logConsumer.AssertRead()
}

0 comments on commit 17b178f

Please sign in to comment.