Skip to content

(refactoring) introduce monitor to manage containers events and application termination #12906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/formatter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix
}
}

func (l *logConsumer) Register(name string) {
l.register(name)
}

func (l *logConsumer) register(name string) *presenter {
var p *presenter
root, _, found := strings.Cut(name, " ")
Expand Down
31 changes: 17 additions & 14 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,24 +640,25 @@ type LogConsumer interface {
Log(containerName, message string)
Err(containerName, message string)
Status(container, msg string)
Register(container string)
}

// ContainerEventListener is a callback to process ContainerEvent from services
type ContainerEventListener func(event ContainerEvent)

// ContainerEvent notify an event has been collected on source container implementing Service
type ContainerEvent struct {
Type int
// Container is the name of the container _without the project prefix_.
Type int
Time int64
Container *ContainerSummary
// Source is the name of the container _without the project prefix_.
//
// This is only suitable for display purposes within Compose, as it's
// not guaranteed to be unique across services.
Container string
ID string
Service string
Line string
// ContainerEventExit only
Source string
ID string
Service string
Line string
// ExitCode is only set on ContainerEventExited events
ExitCode int
Restarting bool
}
Expand All @@ -667,17 +668,19 @@ const (
ContainerEventLog = iota
// ContainerEventErr is a ContainerEvent of type log on stderr. Line is set
ContainerEventErr
// ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container
ContainerEventAttach
// ContainerEventStarted let consumer know a container has been started
ContainerEventStarted
// ContainerEventRestarted let consumer know a container has been restarted
ContainerEventRestarted
// ContainerEventStopped is a ContainerEvent of type stopped.
ContainerEventStopped
// ContainerEventCreated let consumer know a new container has been created
ContainerEventCreated
// ContainerEventRecreated let consumer know container stopped but his being replaced
ContainerEventRecreated
// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
ContainerEventExit
// ContainerEventExited is a ContainerEvent of type exit. ExitCode is set
ContainerEventExited
// UserCancel user cancelled compose up, we are stopping containers
UserCancel
// HookEventLog is a ContainerEvent of type log on stdout by service hook
HookEventLog
)

Expand Down
46 changes: 21 additions & 25 deletions pkg/compose/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
}

func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error {
serviceName := container.Labels[api.ServiceLabel]
containerName := getContainerNameWithoutProject(container)

listener(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: containerName,
ID: container.ID,
Service: serviceName,
})
service := container.Labels[api.ServiceLabel]
name := getContainerNameWithoutProject(container)
return s.doAttachContainer(ctx, service, container.ID, name, listener)
}

func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error {
inspect, err := s.apiClient().ContainerInspect(ctx, id)
if err != nil {
return err
}

wOut := utils.GetWriter(func(line string) {
listener(api.ContainerEvent{
Type: api.ContainerEventLog,
Container: containerName,
ID: container.ID,
Service: serviceName,
Line: line,
Type: api.ContainerEventLog,
Source: name,
ID: id,
Service: service,
Line: line,
})
})
wErr := utils.GetWriter(func(line string) {
listener(api.ContainerEvent{
Type: api.ContainerEventErr,
Container: containerName,
ID: container.ID,
Service: serviceName,
Line: line,
Type: api.ContainerEventErr,
Source: name,
ID: id,
Service: service,
Line: line,
})
})

inspect, err := s.apiClient().ContainerInspect(ctx, container.ID)
if err != nil {
return err
}

_, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr)
_, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr)
return err
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/compose/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate {
}
}

func isRunning() containerPredicate {
return func(c container.Summary) bool {
return c.State == "running"
}
}

// isOrphaned is a predicate to select containers without a matching service definition in compose project
func isOrphaned(project *types.Project) containerPredicate {
services := append(project.ServiceNames(), project.DisabledServiceNames()...)
Expand Down
10 changes: 5 additions & 5 deletions pkg/compose/convergence.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,23 +640,23 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
UseNetworkAliases: true,
Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID),
}
created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w)
timeoutInSecond := utils.DurationSecondToInt(timeout)
err = s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
if err != nil {
return created, err
}

timeoutInSecond := utils.DurationSecondToInt(timeout)
err = s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
err = s.apiClient().ContainerRename(ctx, replaced.ID, tmpName)
if err != nil {
return created, err
}

err = s.apiClient().ContainerRemove(ctx, replaced.ID, containerType.RemoveOptions{})
created, err = s.createMobyContainer(ctx, project, service, name, number, inherited, opts, w)
if err != nil {
return created, err
}

err = s.apiClient().ContainerRename(ctx, created.ID, name)
err = s.apiClient().ContainerRemove(ctx, replaced.ID, containerType.RemoveOptions{})
if err != nil {
return created, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/compose/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error {
wOut := utils.GetWriter(func(line string) {
listener(api.ContainerEvent{
Type: api.HookEventLog,
Container: getContainerNameWithoutProject(ctr) + " ->",
ID: ctr.ID,
Service: service.Name,
Line: line,
Type: api.HookEventLog,
Source: getContainerNameWithoutProject(ctr) + " ->",
ID: ctr.ID,
Service: service.Name,
Line: line,
})
})
defer wOut.Close() //nolint:errcheck
Expand Down
68 changes: 25 additions & 43 deletions pkg/compose/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"io"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/errdefs"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (s *composeService) Logs(
eg, ctx := errgroup.WithContext(ctx)
for _, ctr := range containers {
eg.Go(func() error {
err := s.logContainers(ctx, consumer, ctr, options)
err := s.logContainer(ctx, consumer, ctr, options)
var notImplErr errdefs.ErrNotImplemented
if errors.As(err, &notImplErr) {
logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error())
Expand All @@ -74,34 +73,22 @@ func (s *composeService) Logs(
}

if options.Follow {
containers = containers.filter(isRunning())
printer := newLogPrinter(consumer)
eg.Go(func() error {
_, err := printer.Run(api.CascadeIgnore, "", nil)
return err
})

for _, c := range containers {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
ID: c.ID,
Service: c.Labels[api.ServiceLabel],
})
}

eg.Go(func() error {
err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c),
ID: c.ID,
Service: c.Labels[api.ServiceLabel],
})
monitor := newMonitor(s.apiClient(), projectName)
monitor.withServices(options.Services)
monitor.withListener(printer.HandleEvent)
monitor.withListener(func(event api.ContainerEvent) {
if event.Type == api.ContainerEventStarted {
eg.Go(func() error {
err := s.logContainers(ctx, consumer, c, api.LogOptions{
ctr, err := s.apiClient().ContainerInspect(ctx, event.ID)
if err != nil {
return err
}

err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
Follow: options.Follow,
Since: t.Format(time.RFC3339Nano),
Since: ctr.State.StartedAt,
Until: options.Until,
Tail: options.Tail,
Timestamps: options.Timestamps,
Expand All @@ -113,31 +100,27 @@ func (s *composeService) Logs(
}
return err
})
return nil
}, func(c container.Summary, t time.Time) error {
printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach,
Container: "", // actual name will be set by start event
ID: c.ID,
Service: c.Labels[api.ServiceLabel],
})
return nil
})
printer.Stop()
return err
}
})
eg.Go(func() error {
return monitor.Start(ctx)
})
}

return eg.Wait()
}

func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
cnt, err := s.apiClient().ContainerInspect(ctx, c.ID)
func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
ctr, err := s.apiClient().ContainerInspect(ctx, c.ID)
if err != nil {
return err
}
name := getContainerNameWithoutProject(c)
return s.doLogContainer(ctx, consumer, name, ctr, options)
}

r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{
func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error {
r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: options.Follow,
Expand All @@ -151,11 +134,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons
}
defer r.Close() //nolint:errcheck

name := getContainerNameWithoutProject(c)
w := utils.GetWriter(func(line string) {
consumer.Log(name, line)
})
if cnt.Config.Tty {
if ctr.Config.Tty {
_, err = io.Copy(w, r)
} else {
_, err = stdcopy.StdCopy(w, w, r)
Expand Down
2 changes: 0 additions & 2 deletions pkg/compose/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) {

func (l *testLogConsumer) Status(containerName, msg string) {}

func (l *testLogConsumer) Register(containerName string) {}

func (l *testLogConsumer) LogsForContainer(containerName string) []string {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down
Loading
Loading