Skip to content

Commit

Permalink
Remove blocking wait on container exit for every exec created
Browse files Browse the repository at this point in the history
Commit fixes the memory leak seen in the shim.
It removes creation of channel that waits on container exit
for every new exec. Instead, the container wait channel is exposed
through WaitChannel() function which callers can use to decide
if container has exited or not.

Signed-off-by: Kirtana Ashok <Kirtana.Ashok@microsoft.com>
  • Loading branch information
Kirtana Ashok authored and Kirtana Ashok committed Dec 9, 2022
1 parent 07a19e3 commit d98b48d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 26 deletions.
8 changes: 2 additions & 6 deletions cmd/containerd-shim-runhcs-v1/exec_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,9 @@ func (he *hcsExec) waitForContainerExit() {
trace.StringAttribute("tid", he.tid),
trace.StringAttribute("eid", he.id))

cexit := make(chan struct{})
go func() {
_ = he.c.Wait()
close(cexit)
}()
// wait for container or process to exit and ckean up resrources
select {
case <-cexit:
case <-he.c.WaitChannel():
// Container exited first. We need to force the process into the exited
// state and cleanup any resources
he.sl.Lock()
Expand Down
4 changes: 4 additions & 0 deletions internal/cow/cow.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type Container interface {
// container to be terminated by some error condition (including calling
// Close).
Wait() error
// WaitChannel returns the wait channel of the container
WaitChannel() <-chan struct{}
// WaitError returns the container termination error
WaitError() error
// Modify sends a request to modify container resources
Modify(ctx context.Context, config interface{}) error
}
48 changes: 32 additions & 16 deletions internal/gcs/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Container struct {
notifyCh chan struct{}
closeCh chan struct{}
closeOnce sync.Once
waitBlock chan struct{}
waitError error
}

var _ cow.Container = &Container{}
Expand All @@ -39,10 +41,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
span.AddAttributes(trace.StringAttribute("cid", cid))

c := &Container{
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
waitBlock: make(chan struct{}),
}
err = gc.requestNotify(cid, c.notifyCh)
if err != nil {
Expand All @@ -65,10 +68,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
// container that is already running inside the UVM (after cloning).
func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) {
c := &Container{
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
waitBlock: make(chan struct{}),
}
err = gc.requestNotify(cid, c.notifyCh)
if err != nil {
Expand All @@ -95,6 +99,8 @@ func (c *Container) Close() error {
_, span := oc.StartSpan(context.Background(), "gcs::Container::Close")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))

close(c.closeCh)
})
return nil
}
Expand Down Expand Up @@ -224,23 +230,33 @@ func (c *Container) Terminate(ctx context.Context) (err error) {
return c.shutdown(ctx, rpcShutdownForced)
}

func (c *Container) WaitChannel() <-chan struct{} {
return c.waitBlock
}

func (c *Container) WaitError() error {
return c.waitError
}

// Wait waits for the container to terminate (or Close to be called, or the
// guest connection to terminate).
func (c *Container) Wait() error {
select {
case <-c.notifyCh:
return nil
case <-c.closeCh:
return errors.New("container closed")
}
<-c.WaitChannel()
return c.WaitError()
}

func (c *Container) waitBackground() {
ctx, span := oc.StartSpan(context.Background(), "gcs::Container::waitBackground")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))

err := c.Wait()
select {
case <-c.notifyCh:
case <-c.closeCh:
c.waitError = errors.New("container closed")
}
close(c.waitBlock)

log.G(ctx).Debug("container exited")
oc.SetSpanStatus(span, err)
oc.SetSpanStatus(span, c.waitError)
}
12 changes: 10 additions & 2 deletions internal/hcs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,19 @@ func (computeSystem *System) waitBackground() {
oc.SetSpanStatus(span, err)
}

func (computeSystem *System) WaitChannel() <-chan struct{} {
return computeSystem.waitBlock
}

func (computeSystem *System) WaitError() error {
return computeSystem.waitError
}

// Wait synchronously waits for the compute system to shutdown or terminate. If
// the compute system has already exited returns the previous error (if any).
func (computeSystem *System) Wait() error {
<-computeSystem.waitBlock
return computeSystem.waitError
<-computeSystem.WaitChannel()
return computeSystem.WaitError()
}

// ExitError returns an error describing the reason the compute system terminated.
Expand Down
12 changes: 10 additions & 2 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,19 @@ func (c *JobContainer) Terminate(ctx context.Context) error {
return nil
}

func (c *JobContainer) WaitChannel() <-chan struct{} {
return c.waitBlock
}

func (c *JobContainer) WaitError() error {
return c.waitError
}

// Wait synchronously waits for the container to shutdown or terminate. If
// the container has already exited returns the previous error (if any).
func (c *JobContainer) Wait() error {
<-c.waitBlock
return c.waitError
<-c.WaitChannel()
return c.WaitError()
}

func (c *JobContainer) waitBackground(ctx context.Context) {
Expand Down

0 comments on commit d98b48d

Please sign in to comment.