From 713a8e00e458ba34e21e5bc4309270a661ab4b83 Mon Sep 17 00:00:00 2001 From: Maksim An Date: Tue, 2 Nov 2021 21:51:50 -0700 Subject: [PATCH] Add retries when removing device mapper target (#1200) Add retries when removing device mapper target Additionally ignore the error from device mapper if target removal still fails. The corresponding layer path is already unmounted at that point and this avoids having an inconsistent state. Signed-off-by: Maksim An --- cmd/containerd-shim-runhcs-v1/serve.go | 2 + cmd/gcs/main.go | 19 +++-- container.go | 2 +- internal/gcs/container.go | 41 +++++---- internal/gcs/guestconnection.go | 1 + internal/guest/bridge/bridge.go | 84 +++++++++++++++++-- internal/guest/bridge/bridge_v2.go | 28 ++++--- internal/guest/oom/oom.go | 70 ++++++++++++++++ internal/guest/runtime/hcsv2/container.go | 20 ++++- .../storage/devicemapper/devicemapper.go | 41 +++++++-- .../storage/devicemapper/devicemapper_test.go | 83 ++++++++++++++++++ internal/guest/storage/pmem/pmem.go | 6 +- internal/logfields/fields.go | 2 + 13 files changed, 342 insertions(+), 57 deletions(-) create mode 100644 internal/guest/oom/oom.go diff --git a/cmd/containerd-shim-runhcs-v1/serve.go b/cmd/containerd-shim-runhcs-v1/serve.go index 65f6a5dcaf..f1b6267b5d 100644 --- a/cmd/containerd-shim-runhcs-v1/serve.go +++ b/cmd/containerd-shim-runhcs-v1/serve.go @@ -199,6 +199,8 @@ var serveCommand = cli.Command{ } defer sl.Close() + logrus.WithField("options", fmt.Sprintf("%+v", shimOpts)).Debug("containerd-shim-runhcs-v1::serve options") + serrs := make(chan error, 1) defer close(serrs) go func() { diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index 7dacf5d5b8..10c26fa560 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -1,8 +1,10 @@ +//go:build linux // +build linux package main import ( + "context" "flag" "fmt" "io" @@ -166,13 +168,15 @@ func main() { if err != nil { logrus.WithError(err).Fatal("failed to initialize new runc runtime") } - mux := bridge.NewBridgeMux() - b := bridge.Bridge{ - Handler: mux, - EnableV4: *v4, - } + h := hcsv2.NewHost(rtime, tport) - b.AssignHandlers(mux, h) + b, err := bridge.NewBridge(bridge.WithHost(h), + bridge.WithV4Enabled(*v4), + bridge.WithCgroupVersion(1)) + if err != nil { + logrus.WithError(err).Fatal("could not create bridge") + } + logrus.Debug("successfully created bridge") var bridgeIn io.ReadCloser var bridgeOut io.WriteCloser @@ -248,6 +252,9 @@ func main() { oomFile := os.NewFile(oom, "cefd") defer oomFile.Close() + b.OomWatcher.Add("containers", containersControl) + go b.OomWatcher.Run(context.Background()) + logrus.Warning("added cgrpup \"containers\" to oom watcher") go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl) go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl) err = b.ListenAndServe(bridgeIn, bridgeOut) diff --git a/container.go b/container.go index bfd722898e..1de9625b91 100644 --- a/container.go +++ b/container.go @@ -60,7 +60,7 @@ type container struct { waitCh chan struct{} } -// createComputeSystemAdditionalJSON is read from the environment at initialisation +// createContainerAdditionalJSON is read from the environment at initialization // time. It allows an environment variable to define additional JSON which // is merged in the CreateComputeSystem call to HCS. var createContainerAdditionalJSON []byte diff --git a/internal/gcs/container.go b/internal/gcs/container.go index 6ec5b8b84f..1534682659 100644 --- a/internal/gcs/container.go +++ b/internal/gcs/container.go @@ -10,6 +10,7 @@ import ( "github.com/Microsoft/hcsshim/internal/hcs/schema1" hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" "github.com/Microsoft/hcsshim/internal/log" + "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" "go.opencensus.io/trace" ) @@ -34,7 +35,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf ctx, span := trace.StartSpan(ctx, "gcs::GuestConnection::CreateContainer") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", cid)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, cid)) c := &Container{ gc: gc, @@ -55,7 +56,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf if err != nil { return nil, err } - go c.waitBackground() + go c.waitBackground(ctx) return c, nil } @@ -72,7 +73,7 @@ func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *C if err != nil { return nil, err } - go c.waitBackground() + go c.waitBackground(ctx) return c, nil } @@ -87,12 +88,12 @@ func (c *Container) IsOCI() bool { return c.gc.os != "windows" } -// Close releases associated with the container. +// Close releases resources associated with the container. func (c *Container) Close() error { c.closeOnce.Do(func() { _, span := trace.StartSpan(context.Background(), "gcs::Container::Close") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) }) return nil } @@ -102,7 +103,7 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co ctx, span := trace.StartSpan(ctx, "gcs::Container::CreateProcess") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) return c.gc.exec(ctx, c.id, config) } @@ -117,7 +118,7 @@ func (c *Container) Modify(ctx context.Context, config interface{}) (err error) ctx, span := trace.StartSpan(ctx, "gcs::Container::Modify") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerModifySettings{ requestBase: makeRequest(ctx, c.id), @@ -132,7 +133,7 @@ func (c *Container) Properties(ctx context.Context, types ...schema1.PropertyTyp ctx, span := trace.StartSpan(ctx, "gcs::Container::Properties") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerGetProperties{ requestBase: makeRequest(ctx, c.id), @@ -151,7 +152,7 @@ func (c *Container) PropertiesV2(ctx context.Context, types ...hcsschema.Propert ctx, span := trace.StartSpan(ctx, "gcs::Container::PropertiesV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerGetPropertiesV2{ requestBase: makeRequest(ctx, c.id), @@ -170,7 +171,7 @@ func (c *Container) Start(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Start") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := makeRequest(ctx, c.id) var resp responseBase @@ -201,7 +202,7 @@ func (c *Container) Shutdown(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Shutdown") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -215,7 +216,7 @@ func (c *Container) Terminate(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Terminate") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -233,12 +234,16 @@ func (c *Container) Wait() error { } } -func (c *Container) waitBackground() { - ctx, span := trace.StartSpan(context.Background(), "gcs::Container::waitBackground") +// should only be called once; will wait for container exit and notify other +// `Wait()` calls, as well as update notification buffer +func (c *Container) waitBackground(ctx context.Context) { + var err error + ctx, span := trace.StartSpan(ctx, "gcs::Container::waitBackground") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) + + err = c.Wait() - err := c.Wait() - log.G(ctx).Debug("container exited") - oc.SetSpanStatus(span, err) + log.G(ctx).WithField(logfields.ContainerID, c.id).Debug("container exited") } diff --git a/internal/gcs/guestconnection.go b/internal/gcs/guestconnection.go index f01b97939b..8f4fa498e0 100644 --- a/internal/gcs/guestconnection.go +++ b/internal/gcs/guestconnection.go @@ -249,6 +249,7 @@ func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error { func (gc *GuestConnection) notify(ntf *containerNotification) error { cid := ntf.ContainerID + logrus.WithField(logfields.ContainerID, cid).Info("received notification from guest") gc.mu.Lock() ch := gc.notifyChs[cid] delete(gc.notifyChs, cid) diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 7a399820cb..dd2e58d3f3 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux // Package bridge defines the bridge struct, which implements the control loop @@ -19,10 +20,13 @@ import ( "time" "github.com/Microsoft/hcsshim/internal/guest/gcserr" + hcsoom "github.com/Microsoft/hcsshim/internal/guest/oom" "github.com/Microsoft/hcsshim/internal/guest/prot" "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/log" "github.com/Microsoft/hcsshim/internal/oc" + "github.com/containerd/containerd/pkg/oom" + oomv1 "github.com/containerd/containerd/pkg/oom/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -132,7 +136,7 @@ func (mux *Mux) ServeMsg(r *Request) (RequestResponse, error) { type Request struct { // Context is the request context received from the bridge. Context context.Context - // Header is the wire format message header that preceeded the message for + // Header is the wire format message header that preceded the message for // this request. Header *prot.MessageHeader // ContainerID is the id of the container that this message corresponds to. @@ -159,19 +163,54 @@ type bridgeResponse struct { response interface{} } +type bridgeOpts struct { + enableV4 bool + cgroupV2 bool + host *hcsv2.Host +} + +type BridgeOpt func(*bridgeOpts) error + +func WithV4Enabled(enableV4 bool) BridgeOpt { + return func(bo *bridgeOpts) error { + bo.enableV4 = enableV4 + return nil + } +} + +func WithCgroupVersion(version uint) BridgeOpt { + return func(bo *bridgeOpts) error { + if version < 1 || version > 2 { + return errors.New(fmt.Sprintf("unsupported CGroup version %d", version)) + } else if version == 2 { + return errors.New("version 2 CGroups are currently not supported") + } + + bo.cgroupV2 = false + return nil + } +} + +func WithHost(h *hcsv2.Host) BridgeOpt { + return func(bo *bridgeOpts) error { + bo.host = h + return nil + } +} + // Bridge defines the bridge client in the GCS. It acts in many ways analogous // to go's `http` package and multiplexer. // // It has two fundamentally different dispatch options: // // 1. Request/Response where using the `Handler` a request -// of a given type will be dispatched to the apprpriate handler -// and an appropriate response will respond to exactly that request that -// caused the dispatch. +// of a given type will be dispatched to the appropriate handler +// and an appropriate response will respond to exactly that request that +// caused the dispatch. // // 2. `PublishNotification` where a notification that was not initiated -// by a request from any client can be written to the bridge at any time -// in any order. +// by a request from any client can be written to the bridge at any time +// in any order. type Bridge struct { // Handler to invoke when messages are received. Handler Handler @@ -189,6 +228,39 @@ type Bridge struct { hasQuitPending uint32 protVer prot.ProtocolVersion + + // watch for OOM events, new cgroups can be added as they are created + // publishes them over responseChan via PublishNotification() + OomWatcher oom.Watcher +} + +func NewBridge(opts ...BridgeOpt) (*Bridge, error) { + var bo bridgeOpts + for _, o := range opts { + err := o(&bo) + if err != nil { + return nil, errors.Wrap(err, "could not initialize bridge") + } + } + + mux := NewBridgeMux() + b := Bridge{ + Handler: mux, + EnableV4: bo.enableV4, + } + + b.AssignHandlers(mux, bo.host) + + publisher := hcsoom.BridgePublisherFunc(b.PublishNotification) + // TODO: (helsaawy) add v2 OOM watcher with v2 cgroups support + ep, err := oomv1.New(publisher) + + if err != nil { + return nil, errors.Wrap(err, "could not create OOM watcher to add to bridge") + } + b.OomWatcher = ep + + return &b, nil } // AssignHandlers creates and assigns the appropriate bridge diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 460579b524..062484fbf7 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package bridge @@ -15,6 +16,7 @@ import ( "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/guest/stdio" "github.com/Microsoft/hcsshim/internal/log" + "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" "github.com/pkg/errors" "go.opencensus.io/trace" @@ -47,7 +49,7 @@ func (b *Bridge) negotiateProtocolV2(r *Request) (_ RequestResponse, err error) _, span := trace.StartSpan(r.Context, "opengcs::bridge::negotiateProtocolV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.NegotiateProtocol if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -83,7 +85,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::createContainerV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerCreate if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -136,7 +138,7 @@ func (b *Bridge) startContainerV2(r *Request) (_ RequestResponse, err error) { _, span := trace.StartSpan(r.Context, "opengcs::bridge::startContainerV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) // This is just a noop, but needs to be handled so that an error isn't // returned to the HCS. @@ -167,7 +169,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::execProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerExecuteProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -222,7 +224,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) { func (b *Bridge) killContainerV2(r *Request) (RequestResponse, error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::killContainerV2") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) return b.signalContainerV2(ctx, span, r, unix.SIGKILL) } @@ -235,7 +237,7 @@ func (b *Bridge) killContainerV2(r *Request) (RequestResponse, error) { func (b *Bridge) shutdownContainerV2(r *Request) (RequestResponse, error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::shutdownContainerV2") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) return b.signalContainerV2(ctx, span, r, unix.SIGTERM) } @@ -246,7 +248,7 @@ func (b *Bridge) shutdownContainerV2(r *Request) (RequestResponse, error) { func (b *Bridge) signalContainerV2(ctx context.Context, span *trace.Span, r *Request, signal syscall.Signal) (_ RequestResponse, err error) { defer func() { oc.SetSpanStatus(span, err) }() span.AddAttributes( - trace.StringAttribute("cid", r.ContainerID), + trace.StringAttribute(logfields.ContainerID, r.ContainerID), trace.Int64Attribute("signal", int64(signal))) var request prot.MessageBase @@ -282,7 +284,7 @@ func (b *Bridge) signalProcessV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::signalProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerSignalProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -320,7 +322,7 @@ func (b *Bridge) getPropertiesV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::getPropertiesV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerGetProperties if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -383,7 +385,7 @@ func (b *Bridge) waitOnProcessV2(r *Request) (_ RequestResponse, err error) { _, span := trace.StartSpan(r.Context, "opengcs::bridge::waitOnProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerWaitForProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -438,7 +440,7 @@ func (b *Bridge) resizeConsoleV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::resizeConsoleV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerResizeConsole if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -472,7 +474,7 @@ func (b *Bridge) modifySettingsV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::modifySettingsV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) request, err := prot.UnmarshalContainerModifySettings(r.Message) if err != nil { @@ -504,7 +506,7 @@ func (b *Bridge) deleteContainerStateV2(r *Request) (_ RequestResponse, err erro defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.MessageBase if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { diff --git a/internal/guest/oom/oom.go b/internal/guest/oom/oom.go new file mode 100644 index 0000000000..174c9b3af0 --- /dev/null +++ b/internal/guest/oom/oom.go @@ -0,0 +1,70 @@ +// Package oom providers wrappers and support to leverage the OOM watcher functionality +// in github.com/containerd/containerd/pkg/oom +package oom + +import ( + "context" + "fmt" + + "github.com/Microsoft/hcsshim/internal/guest/prot" + log "github.com/Microsoft/hcsshim/internal/log" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/pkg/errors" +) + +const emptyActivityID = "00000000-0000-0000-0000-000000000000" + +// wraps `bridge.PublishNotification` to conform to `shim.Publish`, so that +// it can be used by containerd's `oom.Watcher` +type BridgePublisherFunc func(*prot.ContainerNotification) + +var _ shim.Publisher = BridgePublisherFunc(func(_ *prot.ContainerNotification) {}) + +func (bp BridgePublisherFunc) Close() error { + // noop + return nil +} + +func (bp BridgePublisherFunc) Publish(ctx context.Context, topic string, event events.Event) error { + n, err := mapEventToNotification(topic, event) + + if err != nil { + return errors.Wrapf(err, "could not handle topic %q and event %+v", topic, event) + } + + log.G(ctx).WithField("event", fmt.Sprintf("%+v", event)).WithField("eventTopic", topic).Debug("publishing event over notification channel") + + bp(n) + return nil +} + +// may want to expand this to other events in the future +func mapEventToNotification(topic string, event events.Event) (*prot.ContainerNotification, error) { + if topic != runtime.TaskOOMEventTopic { + return nil, fmt.Errorf("unsupported event topic %q", topic) + } + + e, ok := event.(*eventstypes.TaskOOM) + if !ok { + return nil, fmt.Errorf("unsupported event type %T", e) + } + + // TODO: add logging here + notification := prot.ContainerNotification{ + MessageBase: prot.MessageBase{ + ActivityID: emptyActivityID, + ContainerID: e.ContainerID, + }, + // not necessarily an exit + Type: prot.NtConstructed, + Operation: prot.AoNone, + Result: 0, + ResultInfo: fmt.Sprintf("%q", topic), + } + + return ¬ification, nil + +} diff --git a/internal/guest/runtime/hcsv2/container.go b/internal/guest/runtime/hcsv2/container.go index 9dcd080cb3..c30886177d 100644 --- a/internal/guest/runtime/hcsv2/container.go +++ b/internal/guest/runtime/hcsv2/container.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package hcsv2 @@ -188,8 +189,7 @@ func (c *Container) GetStats(ctx context.Context) (*v1.Metrics, error) { defer span.End() span.AddAttributes(trace.StringAttribute("cid", c.id)) - cgroupPath := c.spec.Linux.CgroupsPath - cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath)) + cg, err := c.GetCGroup(ctx) if err != nil { return nil, errors.Errorf("failed to get container stats for %v: %v", c.id, err) } @@ -197,6 +197,22 @@ func (c *Container) GetStats(ctx context.Context) (*v1.Metrics, error) { return cg.Stat(cgroups.IgnoreNotExist) } +// GetCGroup returns the cgroup +func (c *Container) GetCGroup(ctx context.Context) (cgroups.Cgroup, error) { + _, span := trace.StartSpan(ctx, "opengcs::Container::GetCGroup") + defer span.End() + span.AddAttributes(trace.StringAttribute("cid", c.id)) + + cgroupPath := c.spec.Linux.CgroupsPath + // TODO: allow for switching to V2 + cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath)) + if err != nil { + return nil, errors.Errorf("failed to load container control group %v: %v", c.id, err) + } + + return cg, nil +} + func (c *Container) modifyContainerConstraints(ctx context.Context, rt prot.ModifyRequestType, cc *prot.ContainerConstraintsV2) (err error) { return c.Update(ctx, cc.Linux) } diff --git a/internal/guest/storage/devicemapper/devicemapper.go b/internal/guest/storage/devicemapper/devicemapper.go index 01d712d37a..c7157fe445 100644 --- a/internal/guest/storage/devicemapper/devicemapper.go +++ b/internal/guest/storage/devicemapper/devicemapper.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "path" + "syscall" + "time" "unsafe" "golang.org/x/sys/unix" @@ -19,6 +21,11 @@ const ( CreateReadOnly CreateFlags = 1 << iota ) +var ( + removeDeviceWrapper = removeDevice + openMapperWrapper = openMapper +) + const ( _IOC_WRITE = 1 _IOC_READ = 2 @@ -223,7 +230,7 @@ func makeTableIoctl(name string, targets []Target) *dmIoctl { // CreateDevice creates a device-mapper device with the given target spec. It returns // the path of the new device node. func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, err error) { - f, err := openMapper() + f, err := openMapperWrapper() if err != nil { return "", err } @@ -238,7 +245,7 @@ func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, e } defer func() { if err != nil { - removeDevice(f, name) + removeDeviceWrapper(f, name) } }() @@ -269,14 +276,30 @@ func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, e } // RemoveDevice removes a device-mapper device and its associated device node. -func RemoveDevice(name string) error { - f, err := openMapper() - if err != nil { - return err +func RemoveDevice(name string) (err error) { + rm := func() error { + f, err := openMapperWrapper() + if err != nil { + return err + } + defer f.Close() + os.Remove(path.Join("/dev/mapper", name)) + return removeDeviceWrapper(f, name) } - defer f.Close() - os.Remove(path.Join("/dev/mapper", name)) - return removeDevice(f, name) + + // This is workaround for "device or resource busy" error, which occasionally happens after the device mapper + // target has been unmounted. + for i := 0; i < 10; i++ { + if err = rm(); err != nil { + if e, ok := err.(*dmError); !ok || e.Err != syscall.EBUSY { + break + } + time.Sleep(10 * time.Millisecond) + continue + } + break + } + return } func removeDevice(f *os.File, name string) error { diff --git a/internal/guest/storage/devicemapper/devicemapper_test.go b/internal/guest/storage/devicemapper/devicemapper_test.go index 723fe1496a..278345b484 100644 --- a/internal/guest/storage/devicemapper/devicemapper_test.go +++ b/internal/guest/storage/devicemapper/devicemapper_test.go @@ -4,7 +4,9 @@ package devicemapper import ( "flag" + "io/ioutil" "os" + "syscall" "testing" "unsafe" @@ -15,6 +17,11 @@ var ( integration = flag.Bool("integration", false, "run integration tests") ) +func clearTestDependencies() { + removeDeviceWrapper = removeDevice + openMapperWrapper = openMapper +} + func TestMain(m *testing.M) { flag.Parse() m.Run() @@ -75,6 +82,8 @@ func createDevice(name string, flags CreateFlags, targets []Target) (*device, er } func TestCreateError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -94,6 +103,8 @@ func TestCreateError(t *testing.T) { } func TestReadOnlyError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -113,6 +124,8 @@ func TestReadOnlyError(t *testing.T) { } func TestLinearError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -140,3 +153,73 @@ func TestLinearError(t *testing.T) { t.Fatal(err) } } + +func TestRemoveDeviceRetriesOnSyscallEBUSY(t *testing.T) { + clearTestDependencies() + + rmDeviceCalled := false + retryDone := false + // Overrides openMapper to return temp file handle + openMapperWrapper = func() (*os.File, error) { + return ioutil.TempFile("", "") + } + removeDeviceWrapper = func(_ *os.File, _ string) error { + if !rmDeviceCalled { + rmDeviceCalled = true + return &dmError{ + Op: 1, + Err: syscall.EBUSY, + } + } + if !retryDone { + retryDone = true + return nil + } + return nil + } + + if err := RemoveDevice("test"); err != nil { + t.Fatalf("expected no error, got: %s", err) + } + if !rmDeviceCalled { + t.Fatalf("expected removeDevice to be called at least once") + } + if !retryDone { + t.Fatalf("expected removeDevice to be retried after initial failure") + } +} + +func TestRemoveDeviceFailsOnNonSyscallEBUSY(t *testing.T) { + clearTestDependencies() + + expectedError := &dmError{ + Op: 0, + Err: syscall.EACCES, + } + rmDeviceCalled := false + retryDone := false + openMapperWrapper = func() (*os.File, error) { + return ioutil.TempFile("", "") + } + removeDeviceWrapper = func(_ *os.File, _ string) error { + if !rmDeviceCalled { + rmDeviceCalled = true + return expectedError + } + if !retryDone { + retryDone = true + return nil + } + return nil + } + + if err := RemoveDevice("test"); err != expectedError { + t.Fatalf("expected error %q, instead got %q", expectedError, err) + } + if !rmDeviceCalled { + t.Fatalf("expected removeDevice to be called once") + } + if retryDone { + t.Fatalf("no retries should've been attempted") + } +} diff --git a/internal/guest/storage/pmem/pmem.go b/internal/guest/storage/pmem/pmem.go index 681659f061..8826e3d00d 100644 --- a/internal/guest/storage/pmem/pmem.go +++ b/internal/guest/storage/pmem/pmem.go @@ -143,14 +143,16 @@ func Unmount(ctx context.Context, devNumber uint32, target string, mappingInfo * if verityInfo != nil { dmVerityName := fmt.Sprintf(verityDeviceFmt, devNumber, verityInfo.RootDigest) if err := dm.RemoveDevice(dmVerityName); err != nil { - return errors.Wrapf(err, "failed to remove dm verity target: %s", dmVerityName) + // The target is already unmounted at this point, ignore potential errors + log.G(ctx).WithError(err).Debugf("failed to remove dm verity target: %s", dmVerityName) } } if mappingInfo != nil { dmLinearName := fmt.Sprintf(linearDeviceFmt, devNumber, mappingInfo.DeviceOffsetInBytes, mappingInfo.DeviceSizeInBytes) if err := dm.RemoveDevice(dmLinearName); err != nil { - return errors.Wrapf(err, "failed to remove dm linear target: %s", dmLinearName) + // The target is already unmounted at this point, ignore potential errors + log.G(ctx).WithError(err).Debugf("failed to remove dm linear target: %s", dmLinearName) } } diff --git a/internal/logfields/fields.go b/internal/logfields/fields.go index cf2c166d9b..bf210ccc97 100644 --- a/internal/logfields/fields.go +++ b/internal/logfields/fields.go @@ -6,6 +6,7 @@ const ( ContainerID = "cid" UVMID = "uvm-id" ProcessID = "pid" + TaskID = "tid" // Common Misc @@ -18,6 +19,7 @@ const ( Field = "field" OCIAnnotation = "oci-annotation" Value = "value" + Options = "options" // Golang type's