diff --git a/cmd/containerd-shim-runhcs-v1/serve.go b/cmd/containerd-shim-runhcs-v1/serve.go index 65f6a5dcaf..f1b6267b5d 100644 --- a/cmd/containerd-shim-runhcs-v1/serve.go +++ b/cmd/containerd-shim-runhcs-v1/serve.go @@ -199,6 +199,8 @@ var serveCommand = cli.Command{ } defer sl.Close() + logrus.WithField("options", fmt.Sprintf("%+v", shimOpts)).Debug("containerd-shim-runhcs-v1::serve options") + serrs := make(chan error, 1) defer close(serrs) go func() { diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index 7dacf5d5b8..10c26fa560 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -1,8 +1,10 @@ +//go:build linux // +build linux package main import ( + "context" "flag" "fmt" "io" @@ -166,13 +168,15 @@ func main() { if err != nil { logrus.WithError(err).Fatal("failed to initialize new runc runtime") } - mux := bridge.NewBridgeMux() - b := bridge.Bridge{ - Handler: mux, - EnableV4: *v4, - } + h := hcsv2.NewHost(rtime, tport) - b.AssignHandlers(mux, h) + b, err := bridge.NewBridge(bridge.WithHost(h), + bridge.WithV4Enabled(*v4), + bridge.WithCgroupVersion(1)) + if err != nil { + logrus.WithError(err).Fatal("could not create bridge") + } + logrus.Debug("successfully created bridge") var bridgeIn io.ReadCloser var bridgeOut io.WriteCloser @@ -248,6 +252,9 @@ func main() { oomFile := os.NewFile(oom, "cefd") defer oomFile.Close() + b.OomWatcher.Add("containers", containersControl) + go b.OomWatcher.Run(context.Background()) + logrus.Warning("added cgrpup \"containers\" to oom watcher") go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl) go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl) err = b.ListenAndServe(bridgeIn, bridgeOut) diff --git a/container.go b/container.go index bfd722898e..1de9625b91 100644 --- a/container.go +++ b/container.go @@ -60,7 +60,7 @@ type container struct { waitCh chan struct{} } -// createComputeSystemAdditionalJSON is read from the environment at initialisation +// createContainerAdditionalJSON is read from the environment at initialization // time. It allows an environment variable to define additional JSON which // is merged in the CreateComputeSystem call to HCS. var createContainerAdditionalJSON []byte diff --git a/internal/gcs/container.go b/internal/gcs/container.go index 6ec5b8b84f..1534682659 100644 --- a/internal/gcs/container.go +++ b/internal/gcs/container.go @@ -10,6 +10,7 @@ import ( "github.com/Microsoft/hcsshim/internal/hcs/schema1" hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" "github.com/Microsoft/hcsshim/internal/log" + "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" "go.opencensus.io/trace" ) @@ -34,7 +35,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf ctx, span := trace.StartSpan(ctx, "gcs::GuestConnection::CreateContainer") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", cid)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, cid)) c := &Container{ gc: gc, @@ -55,7 +56,7 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf if err != nil { return nil, err } - go c.waitBackground() + go c.waitBackground(ctx) return c, nil } @@ -72,7 +73,7 @@ func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *C if err != nil { return nil, err } - go c.waitBackground() + go c.waitBackground(ctx) return c, nil } @@ -87,12 +88,12 @@ func (c *Container) IsOCI() bool { return c.gc.os != "windows" } -// Close releases associated with the container. +// Close releases resources associated with the container. func (c *Container) Close() error { c.closeOnce.Do(func() { _, span := trace.StartSpan(context.Background(), "gcs::Container::Close") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) }) return nil } @@ -102,7 +103,7 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co ctx, span := trace.StartSpan(ctx, "gcs::Container::CreateProcess") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) return c.gc.exec(ctx, c.id, config) } @@ -117,7 +118,7 @@ func (c *Container) Modify(ctx context.Context, config interface{}) (err error) ctx, span := trace.StartSpan(ctx, "gcs::Container::Modify") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerModifySettings{ requestBase: makeRequest(ctx, c.id), @@ -132,7 +133,7 @@ func (c *Container) Properties(ctx context.Context, types ...schema1.PropertyTyp ctx, span := trace.StartSpan(ctx, "gcs::Container::Properties") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerGetProperties{ requestBase: makeRequest(ctx, c.id), @@ -151,7 +152,7 @@ func (c *Container) PropertiesV2(ctx context.Context, types ...hcsschema.Propert ctx, span := trace.StartSpan(ctx, "gcs::Container::PropertiesV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := containerGetPropertiesV2{ requestBase: makeRequest(ctx, c.id), @@ -170,7 +171,7 @@ func (c *Container) Start(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Start") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) req := makeRequest(ctx, c.id) var resp responseBase @@ -201,7 +202,7 @@ func (c *Container) Shutdown(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Shutdown") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -215,7 +216,7 @@ func (c *Container) Terminate(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "gcs::Container::Terminate") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -233,12 +234,16 @@ func (c *Container) Wait() error { } } -func (c *Container) waitBackground() { - ctx, span := trace.StartSpan(context.Background(), "gcs::Container::waitBackground") +// should only be called once; will wait for container exit and notify other +// `Wait()` calls, as well as update notification buffer +func (c *Container) waitBackground(ctx context.Context) { + var err error + ctx, span := trace.StartSpan(ctx, "gcs::Container::waitBackground") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", c.id)) + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, c.id)) + + err = c.Wait() - err := c.Wait() - log.G(ctx).Debug("container exited") - oc.SetSpanStatus(span, err) + log.G(ctx).WithField(logfields.ContainerID, c.id).Debug("container exited") } diff --git a/internal/gcs/guestconnection.go b/internal/gcs/guestconnection.go index f01b97939b..8f4fa498e0 100644 --- a/internal/gcs/guestconnection.go +++ b/internal/gcs/guestconnection.go @@ -249,6 +249,7 @@ func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error { func (gc *GuestConnection) notify(ntf *containerNotification) error { cid := ntf.ContainerID + logrus.WithField(logfields.ContainerID, cid).Info("received notification from guest") gc.mu.Lock() ch := gc.notifyChs[cid] delete(gc.notifyChs, cid) diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 7a399820cb..dd2e58d3f3 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux // Package bridge defines the bridge struct, which implements the control loop @@ -19,10 +20,13 @@ import ( "time" "github.com/Microsoft/hcsshim/internal/guest/gcserr" + hcsoom "github.com/Microsoft/hcsshim/internal/guest/oom" "github.com/Microsoft/hcsshim/internal/guest/prot" "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/log" "github.com/Microsoft/hcsshim/internal/oc" + "github.com/containerd/containerd/pkg/oom" + oomv1 "github.com/containerd/containerd/pkg/oom/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -132,7 +136,7 @@ func (mux *Mux) ServeMsg(r *Request) (RequestResponse, error) { type Request struct { // Context is the request context received from the bridge. Context context.Context - // Header is the wire format message header that preceeded the message for + // Header is the wire format message header that preceded the message for // this request. Header *prot.MessageHeader // ContainerID is the id of the container that this message corresponds to. @@ -159,19 +163,54 @@ type bridgeResponse struct { response interface{} } +type bridgeOpts struct { + enableV4 bool + cgroupV2 bool + host *hcsv2.Host +} + +type BridgeOpt func(*bridgeOpts) error + +func WithV4Enabled(enableV4 bool) BridgeOpt { + return func(bo *bridgeOpts) error { + bo.enableV4 = enableV4 + return nil + } +} + +func WithCgroupVersion(version uint) BridgeOpt { + return func(bo *bridgeOpts) error { + if version < 1 || version > 2 { + return errors.New(fmt.Sprintf("unsupported CGroup version %d", version)) + } else if version == 2 { + return errors.New("version 2 CGroups are currently not supported") + } + + bo.cgroupV2 = false + return nil + } +} + +func WithHost(h *hcsv2.Host) BridgeOpt { + return func(bo *bridgeOpts) error { + bo.host = h + return nil + } +} + // Bridge defines the bridge client in the GCS. It acts in many ways analogous // to go's `http` package and multiplexer. // // It has two fundamentally different dispatch options: // // 1. Request/Response where using the `Handler` a request -// of a given type will be dispatched to the apprpriate handler -// and an appropriate response will respond to exactly that request that -// caused the dispatch. +// of a given type will be dispatched to the appropriate handler +// and an appropriate response will respond to exactly that request that +// caused the dispatch. // // 2. `PublishNotification` where a notification that was not initiated -// by a request from any client can be written to the bridge at any time -// in any order. +// by a request from any client can be written to the bridge at any time +// in any order. type Bridge struct { // Handler to invoke when messages are received. Handler Handler @@ -189,6 +228,39 @@ type Bridge struct { hasQuitPending uint32 protVer prot.ProtocolVersion + + // watch for OOM events, new cgroups can be added as they are created + // publishes them over responseChan via PublishNotification() + OomWatcher oom.Watcher +} + +func NewBridge(opts ...BridgeOpt) (*Bridge, error) { + var bo bridgeOpts + for _, o := range opts { + err := o(&bo) + if err != nil { + return nil, errors.Wrap(err, "could not initialize bridge") + } + } + + mux := NewBridgeMux() + b := Bridge{ + Handler: mux, + EnableV4: bo.enableV4, + } + + b.AssignHandlers(mux, bo.host) + + publisher := hcsoom.BridgePublisherFunc(b.PublishNotification) + // TODO: (helsaawy) add v2 OOM watcher with v2 cgroups support + ep, err := oomv1.New(publisher) + + if err != nil { + return nil, errors.Wrap(err, "could not create OOM watcher to add to bridge") + } + b.OomWatcher = ep + + return &b, nil } // AssignHandlers creates and assigns the appropriate bridge diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 460579b524..062484fbf7 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package bridge @@ -15,6 +16,7 @@ import ( "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/guest/stdio" "github.com/Microsoft/hcsshim/internal/log" + "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" "github.com/pkg/errors" "go.opencensus.io/trace" @@ -47,7 +49,7 @@ func (b *Bridge) negotiateProtocolV2(r *Request) (_ RequestResponse, err error) _, span := trace.StartSpan(r.Context, "opengcs::bridge::negotiateProtocolV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.NegotiateProtocol if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -83,7 +85,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::createContainerV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerCreate if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -136,7 +138,7 @@ func (b *Bridge) startContainerV2(r *Request) (_ RequestResponse, err error) { _, span := trace.StartSpan(r.Context, "opengcs::bridge::startContainerV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) // This is just a noop, but needs to be handled so that an error isn't // returned to the HCS. @@ -167,7 +169,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::execProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerExecuteProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -222,7 +224,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) { func (b *Bridge) killContainerV2(r *Request) (RequestResponse, error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::killContainerV2") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) return b.signalContainerV2(ctx, span, r, unix.SIGKILL) } @@ -235,7 +237,7 @@ func (b *Bridge) killContainerV2(r *Request) (RequestResponse, error) { func (b *Bridge) shutdownContainerV2(r *Request) (RequestResponse, error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::shutdownContainerV2") defer span.End() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) return b.signalContainerV2(ctx, span, r, unix.SIGTERM) } @@ -246,7 +248,7 @@ func (b *Bridge) shutdownContainerV2(r *Request) (RequestResponse, error) { func (b *Bridge) signalContainerV2(ctx context.Context, span *trace.Span, r *Request, signal syscall.Signal) (_ RequestResponse, err error) { defer func() { oc.SetSpanStatus(span, err) }() span.AddAttributes( - trace.StringAttribute("cid", r.ContainerID), + trace.StringAttribute(logfields.ContainerID, r.ContainerID), trace.Int64Attribute("signal", int64(signal))) var request prot.MessageBase @@ -282,7 +284,7 @@ func (b *Bridge) signalProcessV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::signalProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerSignalProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -320,7 +322,7 @@ func (b *Bridge) getPropertiesV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::getPropertiesV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerGetProperties if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -383,7 +385,7 @@ func (b *Bridge) waitOnProcessV2(r *Request) (_ RequestResponse, err error) { _, span := trace.StartSpan(r.Context, "opengcs::bridge::waitOnProcessV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerWaitForProcess if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -438,7 +440,7 @@ func (b *Bridge) resizeConsoleV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::resizeConsoleV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.ContainerResizeConsole if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { @@ -472,7 +474,7 @@ func (b *Bridge) modifySettingsV2(r *Request) (_ RequestResponse, err error) { ctx, span := trace.StartSpan(r.Context, "opengcs::bridge::modifySettingsV2") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) request, err := prot.UnmarshalContainerModifySettings(r.Message) if err != nil { @@ -504,7 +506,7 @@ func (b *Bridge) deleteContainerStateV2(r *Request) (_ RequestResponse, err erro defer span.End() defer func() { oc.SetSpanStatus(span, err) }() - span.AddAttributes(trace.StringAttribute("cid", r.ContainerID)) + span.AddAttributes(trace.StringAttribute(logfields.ContainerID, r.ContainerID)) var request prot.MessageBase if err := commonutils.UnmarshalJSONWithHresult(r.Message, &request); err != nil { diff --git a/internal/guest/oom/oom.go b/internal/guest/oom/oom.go new file mode 100644 index 0000000000..174c9b3af0 --- /dev/null +++ b/internal/guest/oom/oom.go @@ -0,0 +1,70 @@ +// Package oom providers wrappers and support to leverage the OOM watcher functionality +// in github.com/containerd/containerd/pkg/oom +package oom + +import ( + "context" + "fmt" + + "github.com/Microsoft/hcsshim/internal/guest/prot" + log "github.com/Microsoft/hcsshim/internal/log" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/pkg/errors" +) + +const emptyActivityID = "00000000-0000-0000-0000-000000000000" + +// wraps `bridge.PublishNotification` to conform to `shim.Publish`, so that +// it can be used by containerd's `oom.Watcher` +type BridgePublisherFunc func(*prot.ContainerNotification) + +var _ shim.Publisher = BridgePublisherFunc(func(_ *prot.ContainerNotification) {}) + +func (bp BridgePublisherFunc) Close() error { + // noop + return nil +} + +func (bp BridgePublisherFunc) Publish(ctx context.Context, topic string, event events.Event) error { + n, err := mapEventToNotification(topic, event) + + if err != nil { + return errors.Wrapf(err, "could not handle topic %q and event %+v", topic, event) + } + + log.G(ctx).WithField("event", fmt.Sprintf("%+v", event)).WithField("eventTopic", topic).Debug("publishing event over notification channel") + + bp(n) + return nil +} + +// may want to expand this to other events in the future +func mapEventToNotification(topic string, event events.Event) (*prot.ContainerNotification, error) { + if topic != runtime.TaskOOMEventTopic { + return nil, fmt.Errorf("unsupported event topic %q", topic) + } + + e, ok := event.(*eventstypes.TaskOOM) + if !ok { + return nil, fmt.Errorf("unsupported event type %T", e) + } + + // TODO: add logging here + notification := prot.ContainerNotification{ + MessageBase: prot.MessageBase{ + ActivityID: emptyActivityID, + ContainerID: e.ContainerID, + }, + // not necessarily an exit + Type: prot.NtConstructed, + Operation: prot.AoNone, + Result: 0, + ResultInfo: fmt.Sprintf("%q", topic), + } + + return ¬ification, nil + +} diff --git a/internal/guest/runtime/hcsv2/container.go b/internal/guest/runtime/hcsv2/container.go index 9dcd080cb3..c30886177d 100644 --- a/internal/guest/runtime/hcsv2/container.go +++ b/internal/guest/runtime/hcsv2/container.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package hcsv2 @@ -188,8 +189,7 @@ func (c *Container) GetStats(ctx context.Context) (*v1.Metrics, error) { defer span.End() span.AddAttributes(trace.StringAttribute("cid", c.id)) - cgroupPath := c.spec.Linux.CgroupsPath - cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath)) + cg, err := c.GetCGroup(ctx) if err != nil { return nil, errors.Errorf("failed to get container stats for %v: %v", c.id, err) } @@ -197,6 +197,22 @@ func (c *Container) GetStats(ctx context.Context) (*v1.Metrics, error) { return cg.Stat(cgroups.IgnoreNotExist) } +// GetCGroup returns the cgroup +func (c *Container) GetCGroup(ctx context.Context) (cgroups.Cgroup, error) { + _, span := trace.StartSpan(ctx, "opengcs::Container::GetCGroup") + defer span.End() + span.AddAttributes(trace.StringAttribute("cid", c.id)) + + cgroupPath := c.spec.Linux.CgroupsPath + // TODO: allow for switching to V2 + cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath)) + if err != nil { + return nil, errors.Errorf("failed to load container control group %v: %v", c.id, err) + } + + return cg, nil +} + func (c *Container) modifyContainerConstraints(ctx context.Context, rt prot.ModifyRequestType, cc *prot.ContainerConstraintsV2) (err error) { return c.Update(ctx, cc.Linux) } diff --git a/internal/guest/storage/devicemapper/devicemapper.go b/internal/guest/storage/devicemapper/devicemapper.go index 01d712d37a..c7157fe445 100644 --- a/internal/guest/storage/devicemapper/devicemapper.go +++ b/internal/guest/storage/devicemapper/devicemapper.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "path" + "syscall" + "time" "unsafe" "golang.org/x/sys/unix" @@ -19,6 +21,11 @@ const ( CreateReadOnly CreateFlags = 1 << iota ) +var ( + removeDeviceWrapper = removeDevice + openMapperWrapper = openMapper +) + const ( _IOC_WRITE = 1 _IOC_READ = 2 @@ -223,7 +230,7 @@ func makeTableIoctl(name string, targets []Target) *dmIoctl { // CreateDevice creates a device-mapper device with the given target spec. It returns // the path of the new device node. func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, err error) { - f, err := openMapper() + f, err := openMapperWrapper() if err != nil { return "", err } @@ -238,7 +245,7 @@ func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, e } defer func() { if err != nil { - removeDevice(f, name) + removeDeviceWrapper(f, name) } }() @@ -269,14 +276,30 @@ func CreateDevice(name string, flags CreateFlags, targets []Target) (_ string, e } // RemoveDevice removes a device-mapper device and its associated device node. -func RemoveDevice(name string) error { - f, err := openMapper() - if err != nil { - return err +func RemoveDevice(name string) (err error) { + rm := func() error { + f, err := openMapperWrapper() + if err != nil { + return err + } + defer f.Close() + os.Remove(path.Join("/dev/mapper", name)) + return removeDeviceWrapper(f, name) } - defer f.Close() - os.Remove(path.Join("/dev/mapper", name)) - return removeDevice(f, name) + + // This is workaround for "device or resource busy" error, which occasionally happens after the device mapper + // target has been unmounted. + for i := 0; i < 10; i++ { + if err = rm(); err != nil { + if e, ok := err.(*dmError); !ok || e.Err != syscall.EBUSY { + break + } + time.Sleep(10 * time.Millisecond) + continue + } + break + } + return } func removeDevice(f *os.File, name string) error { diff --git a/internal/guest/storage/devicemapper/devicemapper_test.go b/internal/guest/storage/devicemapper/devicemapper_test.go index 723fe1496a..278345b484 100644 --- a/internal/guest/storage/devicemapper/devicemapper_test.go +++ b/internal/guest/storage/devicemapper/devicemapper_test.go @@ -4,7 +4,9 @@ package devicemapper import ( "flag" + "io/ioutil" "os" + "syscall" "testing" "unsafe" @@ -15,6 +17,11 @@ var ( integration = flag.Bool("integration", false, "run integration tests") ) +func clearTestDependencies() { + removeDeviceWrapper = removeDevice + openMapperWrapper = openMapper +} + func TestMain(m *testing.M) { flag.Parse() m.Run() @@ -75,6 +82,8 @@ func createDevice(name string, flags CreateFlags, targets []Target) (*device, er } func TestCreateError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -94,6 +103,8 @@ func TestCreateError(t *testing.T) { } func TestReadOnlyError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -113,6 +124,8 @@ func TestReadOnlyError(t *testing.T) { } func TestLinearError(t *testing.T) { + clearTestDependencies() + if !*integration { t.Skip() } @@ -140,3 +153,73 @@ func TestLinearError(t *testing.T) { t.Fatal(err) } } + +func TestRemoveDeviceRetriesOnSyscallEBUSY(t *testing.T) { + clearTestDependencies() + + rmDeviceCalled := false + retryDone := false + // Overrides openMapper to return temp file handle + openMapperWrapper = func() (*os.File, error) { + return ioutil.TempFile("", "") + } + removeDeviceWrapper = func(_ *os.File, _ string) error { + if !rmDeviceCalled { + rmDeviceCalled = true + return &dmError{ + Op: 1, + Err: syscall.EBUSY, + } + } + if !retryDone { + retryDone = true + return nil + } + return nil + } + + if err := RemoveDevice("test"); err != nil { + t.Fatalf("expected no error, got: %s", err) + } + if !rmDeviceCalled { + t.Fatalf("expected removeDevice to be called at least once") + } + if !retryDone { + t.Fatalf("expected removeDevice to be retried after initial failure") + } +} + +func TestRemoveDeviceFailsOnNonSyscallEBUSY(t *testing.T) { + clearTestDependencies() + + expectedError := &dmError{ + Op: 0, + Err: syscall.EACCES, + } + rmDeviceCalled := false + retryDone := false + openMapperWrapper = func() (*os.File, error) { + return ioutil.TempFile("", "") + } + removeDeviceWrapper = func(_ *os.File, _ string) error { + if !rmDeviceCalled { + rmDeviceCalled = true + return expectedError + } + if !retryDone { + retryDone = true + return nil + } + return nil + } + + if err := RemoveDevice("test"); err != expectedError { + t.Fatalf("expected error %q, instead got %q", expectedError, err) + } + if !rmDeviceCalled { + t.Fatalf("expected removeDevice to be called once") + } + if retryDone { + t.Fatalf("no retries should've been attempted") + } +} diff --git a/internal/guest/storage/pmem/pmem.go b/internal/guest/storage/pmem/pmem.go index 681659f061..8826e3d00d 100644 --- a/internal/guest/storage/pmem/pmem.go +++ b/internal/guest/storage/pmem/pmem.go @@ -143,14 +143,16 @@ func Unmount(ctx context.Context, devNumber uint32, target string, mappingInfo * if verityInfo != nil { dmVerityName := fmt.Sprintf(verityDeviceFmt, devNumber, verityInfo.RootDigest) if err := dm.RemoveDevice(dmVerityName); err != nil { - return errors.Wrapf(err, "failed to remove dm verity target: %s", dmVerityName) + // The target is already unmounted at this point, ignore potential errors + log.G(ctx).WithError(err).Debugf("failed to remove dm verity target: %s", dmVerityName) } } if mappingInfo != nil { dmLinearName := fmt.Sprintf(linearDeviceFmt, devNumber, mappingInfo.DeviceOffsetInBytes, mappingInfo.DeviceSizeInBytes) if err := dm.RemoveDevice(dmLinearName); err != nil { - return errors.Wrapf(err, "failed to remove dm linear target: %s", dmLinearName) + // The target is already unmounted at this point, ignore potential errors + log.G(ctx).WithError(err).Debugf("failed to remove dm linear target: %s", dmLinearName) } } diff --git a/internal/logfields/fields.go b/internal/logfields/fields.go index cf2c166d9b..bf210ccc97 100644 --- a/internal/logfields/fields.go +++ b/internal/logfields/fields.go @@ -6,6 +6,7 @@ const ( ContainerID = "cid" UVMID = "uvm-id" ProcessID = "pid" + TaskID = "tid" // Common Misc @@ -18,6 +19,7 @@ const ( Field = "field" OCIAnnotation = "oci-annotation" Value = "value" + Options = "options" // Golang type's