Skip to content

Commit

Permalink
Add retries when removing device mapper target (microsoft#1200)
Browse files Browse the repository at this point in the history
Add retries when removing device mapper target

Additionally ignore the error from device mapper if target removal
still fails. The corresponding layer path is already unmounted
at that point and this avoids having an inconsistent state.

Signed-off-by: Maksim An <maksiman@microsoft.com>
  • Loading branch information
anmaxvl authored and helsaawy committed Nov 3, 2021
1 parent 97ca2e0 commit 713a8e0
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 57 deletions.
2 changes: 2 additions & 0 deletions cmd/containerd-shim-runhcs-v1/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ var serveCommand = cli.Command{
}
defer sl.Close()

logrus.WithField("options", fmt.Sprintf("%+v", shimOpts)).Debug("containerd-shim-runhcs-v1::serve options")

serrs := make(chan error, 1)
defer close(serrs)
go func() {
Expand Down
19 changes: 13 additions & 6 deletions cmd/gcs/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//go:build linux
// +build linux

package main

import (
"context"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -166,13 +168,15 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("failed to initialize new runc runtime")
}
mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
}

h := hcsv2.NewHost(rtime, tport)
b.AssignHandlers(mux, h)
b, err := bridge.NewBridge(bridge.WithHost(h),
bridge.WithV4Enabled(*v4),
bridge.WithCgroupVersion(1))
if err != nil {
logrus.WithError(err).Fatal("could not create bridge")
}
logrus.Debug("successfully created bridge")

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
Expand Down Expand Up @@ -248,6 +252,9 @@ func main() {
oomFile := os.NewFile(oom, "cefd")
defer oomFile.Close()

b.OomWatcher.Add("containers", containersControl)
go b.OomWatcher.Run(context.Background())
logrus.Warning("added cgrpup \"containers\" to oom watcher")
go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl)
go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl)
err = b.ListenAndServe(bridgeIn, bridgeOut)
Expand Down
2 changes: 1 addition & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type container struct {
waitCh chan struct{}
}

// createComputeSystemAdditionalJSON is read from the environment at initialisation
// createContainerAdditionalJSON is read from the environment at initialization
// time. It allows an environment variable to define additional JSON which
// is merged in the CreateComputeSystem call to HCS.
var createContainerAdditionalJSON []byte
Expand Down
41 changes: 23 additions & 18 deletions internal/gcs/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Microsoft/hcsshim/internal/hcs/schema1"
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/logfields"
"github.com/Microsoft/hcsshim/internal/oc"
"go.opencensus.io/trace"
)
Expand All @@ -34,7 +35,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
ctx, span := trace.StartSpan(ctx, "gcs::GuestConnection::CreateContainer")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", cid))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, cid))

c := &Container{
gc: gc,
Expand All @@ -55,7 +56,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
if err != nil {
return nil, err
}
go c.waitBackground()
go c.waitBackground(ctx)
return c, nil
}

Expand All @@ -72,7 +73,7 @@ func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *C
if err != nil {
return nil, err
}
go c.waitBackground()
go c.waitBackground(ctx)
return c, nil
}

Expand All @@ -87,12 +88,12 @@ func (c *Container) IsOCI() bool {
return c.gc.os != "windows"
}

// Close releases associated with the container.
// Close releases resources associated with the container.
func (c *Container) Close() error {
c.closeOnce.Do(func() {
_, span := trace.StartSpan(context.Background(), "gcs::Container::Close")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))
})
return nil
}
Expand All @@ -102,7 +103,7 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co
ctx, span := trace.StartSpan(ctx, "gcs::Container::CreateProcess")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

return c.gc.exec(ctx, c.id, config)
}
Expand All @@ -117,7 +118,7 @@ func (c *Container) Modify(ctx context.Context, config interface{}) (err error)
ctx, span := trace.StartSpan(ctx, "gcs::Container::Modify")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

req := containerModifySettings{
requestBase: makeRequest(ctx, c.id),
Expand All @@ -132,7 +133,7 @@ func (c *Container) Properties(ctx context.Context, types ...schema1.PropertyTyp
ctx, span := trace.StartSpan(ctx, "gcs::Container::Properties")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

req := containerGetProperties{
requestBase: makeRequest(ctx, c.id),
Expand All @@ -151,7 +152,7 @@ func (c *Container) PropertiesV2(ctx context.Context, types ...hcsschema.Propert
ctx, span := trace.StartSpan(ctx, "gcs::Container::PropertiesV2")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

req := containerGetPropertiesV2{
requestBase: makeRequest(ctx, c.id),
Expand All @@ -170,7 +171,7 @@ func (c *Container) Start(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "gcs::Container::Start")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

req := makeRequest(ctx, c.id)
var resp responseBase
Expand Down Expand Up @@ -201,7 +202,7 @@ func (c *Container) Shutdown(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "gcs::Container::Shutdown")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand All @@ -215,7 +216,7 @@ func (c *Container) Terminate(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "gcs::Container::Terminate")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute("cid", c.id))
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand All @@ -233,12 +234,16 @@ func (c *Container) Wait() error {
}
}

func (c *Container) waitBackground() {
ctx, span := trace.StartSpan(context.Background(), "gcs::Container::waitBackground")
// should only be called once; will wait for container exit and notify other
// `Wait()` calls, as well as update notification buffer
func (c *Container) waitBackground(ctx context.Context) {
var err error
ctx, span := trace.StartSpan(ctx, "gcs::Container::waitBackground")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id))

err = c.Wait()

err := c.Wait()
log.G(ctx).Debug("container exited")
oc.SetSpanStatus(span, err)
log.G(ctx).WithField(logfields.ContainerID, c.id).Debug("container exited")
}
1 change: 1 addition & 0 deletions internal/gcs/guestconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error {

func (gc *GuestConnection) notify(ntf *containerNotification) error {
cid := ntf.ContainerID
logrus.WithField(logfields.ContainerID, cid).Info("received notification from guest")
gc.mu.Lock()
ch := gc.notifyChs[cid]
delete(gc.notifyChs, cid)
Expand Down
84 changes: 78 additions & 6 deletions internal/guest/bridge/bridge.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build linux
// +build linux

// Package bridge defines the bridge struct, which implements the control loop
Expand All @@ -19,10 +20,13 @@ import (
"time"

"github.com/Microsoft/hcsshim/internal/guest/gcserr"
hcsoom "github.com/Microsoft/hcsshim/internal/guest/oom"
"github.com/Microsoft/hcsshim/internal/guest/prot"
"github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/oc"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -132,7 +136,7 @@ func (mux *Mux) ServeMsg(r *Request) (RequestResponse, error) {
type Request struct {
// Context is the request context received from the bridge.
Context context.Context
// Header is the wire format message header that preceeded the message for
// Header is the wire format message header that preceded the message for
// this request.
Header *prot.MessageHeader
// ContainerID is the id of the container that this message corresponds to.
Expand All @@ -159,19 +163,54 @@ type bridgeResponse struct {
response interface{}
}

type bridgeOpts struct {
enableV4 bool
cgroupV2 bool
host *hcsv2.Host
}

type BridgeOpt func(*bridgeOpts) error

func WithV4Enabled(enableV4 bool) BridgeOpt {
return func(bo *bridgeOpts) error {
bo.enableV4 = enableV4
return nil
}
}

func WithCgroupVersion(version uint) BridgeOpt {
return func(bo *bridgeOpts) error {
if version < 1 || version > 2 {
return errors.New(fmt.Sprintf("unsupported CGroup version %d", version))
} else if version == 2 {
return errors.New("version 2 CGroups are currently not supported")
}

bo.cgroupV2 = false
return nil
}
}

func WithHost(h *hcsv2.Host) BridgeOpt {
return func(bo *bridgeOpts) error {
bo.host = h
return nil
}
}

// Bridge defines the bridge client in the GCS. It acts in many ways analogous
// to go's `http` package and multiplexer.
//
// It has two fundamentally different dispatch options:
//
// 1. Request/Response where using the `Handler` a request
// of a given type will be dispatched to the apprpriate handler
// and an appropriate response will respond to exactly that request that
// caused the dispatch.
// of a given type will be dispatched to the appropriate handler
// and an appropriate response will respond to exactly that request that
// caused the dispatch.
//
// 2. `PublishNotification` where a notification that was not initiated
// by a request from any client can be written to the bridge at any time
// in any order.
// by a request from any client can be written to the bridge at any time
// in any order.
type Bridge struct {
// Handler to invoke when messages are received.
Handler Handler
Expand All @@ -189,6 +228,39 @@ type Bridge struct {
hasQuitPending uint32

protVer prot.ProtocolVersion

// watch for OOM events, new cgroups can be added as they are created
// publishes them over responseChan via PublishNotification()
OomWatcher oom.Watcher
}

func NewBridge(opts ...BridgeOpt) (*Bridge, error) {
var bo bridgeOpts
for _, o := range opts {
err := o(&bo)
if err != nil {
return nil, errors.Wrap(err, "could not initialize bridge")
}
}

mux := NewBridgeMux()
b := Bridge{
Handler: mux,
EnableV4: bo.enableV4,
}

b.AssignHandlers(mux, bo.host)

publisher := hcsoom.BridgePublisherFunc(b.PublishNotification)
// TODO: (helsaawy) add v2 OOM watcher with v2 cgroups support
ep, err := oomv1.New(publisher)

if err != nil {
return nil, errors.Wrap(err, "could not create OOM watcher to add to bridge")
}
b.OomWatcher = ep

return &b, nil
}

// AssignHandlers creates and assigns the appropriate bridge
Expand Down
Loading

0 comments on commit 713a8e0

Please sign in to comment.