From d98b48d33654a04b22c9d5da8e50741240a7018a Mon Sep 17 00:00:00 2001 From: Kirtana Ashok Date: Fri, 9 Dec 2022 12:16:51 -0800 Subject: [PATCH] Remove blocking wait on container exit for every exec created Commit fixes the memory leak seen in the shim. It removes creation of channel that waits on container exit for every new exec. Instead, the container wait channel is exposed through WaitChannel() function which callers can use to decide if container has exited or not. Signed-off-by: Kirtana Ashok --- cmd/containerd-shim-runhcs-v1/exec_hcs.go | 8 +--- internal/cow/cow.go | 4 ++ internal/gcs/container.go | 48 +++++++++++++++-------- internal/hcs/system.go | 12 +++++- internal/jobcontainers/jobcontainer.go | 12 +++++- 5 files changed, 58 insertions(+), 26 deletions(-) diff --git a/cmd/containerd-shim-runhcs-v1/exec_hcs.go b/cmd/containerd-shim-runhcs-v1/exec_hcs.go index ad1c6ffaf2..ca444d60c8 100644 --- a/cmd/containerd-shim-runhcs-v1/exec_hcs.go +++ b/cmd/containerd-shim-runhcs-v1/exec_hcs.go @@ -493,13 +493,9 @@ func (he *hcsExec) waitForContainerExit() { trace.StringAttribute("tid", he.tid), trace.StringAttribute("eid", he.id)) - cexit := make(chan struct{}) - go func() { - _ = he.c.Wait() - close(cexit) - }() + // wait for container or process to exit and ckean up resrources select { - case <-cexit: + case <-he.c.WaitChannel(): // Container exited first. We need to force the process into the exited // state and cleanup any resources he.sl.Lock() diff --git a/internal/cow/cow.go b/internal/cow/cow.go index c6eeb167b9..cb8b76c6de 100644 --- a/internal/cow/cow.go +++ b/internal/cow/cow.go @@ -88,6 +88,10 @@ type Container interface { // container to be terminated by some error condition (including calling // Close). Wait() error + // WaitChannel returns the wait channel of the container + WaitChannel() <-chan struct{} + // WaitError returns the container termination error + WaitError() error // Modify sends a request to modify container resources Modify(ctx context.Context, config interface{}) error } diff --git a/internal/gcs/container.go b/internal/gcs/container.go index 84889a8608..4172d37daf 100644 --- a/internal/gcs/container.go +++ b/internal/gcs/container.go @@ -26,6 +26,8 @@ type Container struct { notifyCh chan struct{} closeCh chan struct{} closeOnce sync.Once + waitBlock chan struct{} + waitError error } var _ cow.Container = &Container{} @@ -39,10 +41,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf span.AddAttributes(trace.StringAttribute("cid", cid)) c := &Container{ - gc: gc, - id: cid, - notifyCh: make(chan struct{}), - closeCh: make(chan struct{}), + gc: gc, + id: cid, + notifyCh: make(chan struct{}), + closeCh: make(chan struct{}), + waitBlock: make(chan struct{}), } err = gc.requestNotify(cid, c.notifyCh) if err != nil { @@ -65,10 +68,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf // container that is already running inside the UVM (after cloning). func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) { c := &Container{ - gc: gc, - id: cid, - notifyCh: make(chan struct{}), - closeCh: make(chan struct{}), + gc: gc, + id: cid, + notifyCh: make(chan struct{}), + closeCh: make(chan struct{}), + waitBlock: make(chan struct{}), } err = gc.requestNotify(cid, c.notifyCh) if err != nil { @@ -95,6 +99,8 @@ func (c *Container) Close() error { _, span := oc.StartSpan(context.Background(), "gcs::Container::Close") defer span.End() span.AddAttributes(trace.StringAttribute("cid", c.id)) + + close(c.closeCh) }) return nil } @@ -224,15 +230,19 @@ func (c *Container) Terminate(ctx context.Context) (err error) { return c.shutdown(ctx, rpcShutdownForced) } +func (c *Container) WaitChannel() <-chan struct{} { + return c.waitBlock +} + +func (c *Container) WaitError() error { + return c.waitError +} + // Wait waits for the container to terminate (or Close to be called, or the // guest connection to terminate). func (c *Container) Wait() error { - select { - case <-c.notifyCh: - return nil - case <-c.closeCh: - return errors.New("container closed") - } + <-c.WaitChannel() + return c.WaitError() } func (c *Container) waitBackground() { @@ -240,7 +250,13 @@ func (c *Container) waitBackground() { defer span.End() span.AddAttributes(trace.StringAttribute("cid", c.id)) - err := c.Wait() + select { + case <-c.notifyCh: + case <-c.closeCh: + c.waitError = errors.New("container closed") + } + close(c.waitBlock) + log.G(ctx).Debug("container exited") - oc.SetSpanStatus(span, err) + oc.SetSpanStatus(span, c.waitError) } diff --git a/internal/hcs/system.go b/internal/hcs/system.go index 4b51ce6dee..1469eb8e06 100644 --- a/internal/hcs/system.go +++ b/internal/hcs/system.go @@ -291,11 +291,19 @@ func (computeSystem *System) waitBackground() { oc.SetSpanStatus(span, err) } +func (computeSystem *System) WaitChannel() <-chan struct{} { + return computeSystem.waitBlock +} + +func (computeSystem *System) WaitError() error { + return computeSystem.waitError +} + // Wait synchronously waits for the compute system to shutdown or terminate. If // the compute system has already exited returns the previous error (if any). func (computeSystem *System) Wait() error { - <-computeSystem.waitBlock - return computeSystem.waitError + <-computeSystem.WaitChannel() + return computeSystem.WaitError() } // ExitError returns an error describing the reason the compute system terminated. diff --git a/internal/jobcontainers/jobcontainer.go b/internal/jobcontainers/jobcontainer.go index 53db021d3c..e68a4820e4 100644 --- a/internal/jobcontainers/jobcontainer.go +++ b/internal/jobcontainers/jobcontainer.go @@ -626,11 +626,19 @@ func (c *JobContainer) Terminate(ctx context.Context) error { return nil } +func (c *JobContainer) WaitChannel() <-chan struct{} { + return c.waitBlock +} + +func (c *JobContainer) WaitError() error { + return c.waitError +} + // Wait synchronously waits for the container to shutdown or terminate. If // the container has already exited returns the previous error (if any). func (c *JobContainer) Wait() error { - <-c.waitBlock - return c.waitError + <-c.WaitChannel() + return c.WaitError() } func (c *JobContainer) waitBackground(ctx context.Context) {