Skip to content

Commit

Permalink
Polling rkt implementation of new watcher inteface (google#1284)
Browse files Browse the repository at this point in the history
polling rkt implementation of new watcher inteface
  • Loading branch information
sjpotter authored and tallclair committed May 17, 2016
1 parent 8cf6ed3 commit 6fa3687
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 141 deletions.
5 changes: 2 additions & 3 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

192 changes: 98 additions & 94 deletions Godeps/_workspace/src/github.com/coreos/rkt/api/v1alpha/api.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ const (
ListRecursive
)

type ContainerType int

const (
ContainerTypeRaw ContainerType = iota
ContainerTypeDocker
ContainerTypeRkt
ContainerTypeSystemd
)

// Interface for container operation handlers.
type ContainerHandler interface {
// Returns the ContainerReference
Expand Down Expand Up @@ -59,4 +68,7 @@ type ContainerHandler interface {
// Start starts any necessary background goroutines - must be cleaned up in Cleanup().
// It is expected that most implementations will be a no-op.
Start()

// Type of handler
Type() ContainerType
}
4 changes: 4 additions & 0 deletions container/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,7 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) (
func (self *dockerContainerHandler) Exists() bool {
return common.CgroupExists(self.cgroupPaths)
}

func (self *dockerContainerHandler) Type() container.ContainerType {
return container.ContainerTypeDocker
}
4 changes: 2 additions & 2 deletions container/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (self *MockContainerHandler) GetContainerLabels() map[string]string {
return args.Get(0).(map[string]string)
}

func (self *MockContainerHandler) String() string {
func (self *MockContainerHandler) Type() ContainerType {
args := self.Called()
return args.Get(0).(string)
return args.Get(0).(ContainerType)
}

type FactoryForMockContainerHandler struct {
Expand Down
4 changes: 4 additions & 0 deletions container/raw/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,7 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i
func (self *rawContainerHandler) Exists() bool {
return common.CgroupExists(self.cgroupPaths)
}

func (self *rawContainerHandler) Type() container.ContainerType {
return container.ContainerTypeRaw
}
2 changes: 1 addition & 1 deletion container/rkt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
const (
defaultRktAPIServiceAddr = "localhost:15441"
timeout = 2 * time.Second
minimumRktBinVersion = "1.5.0"
minimumRktBinVersion = "1.6.0"
)

var (
Expand Down
12 changes: 3 additions & 9 deletions container/rkt/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package rkt

import (
"fmt"
"strings"

"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/libcontainer"
Expand Down Expand Up @@ -59,14 +58,9 @@ func (self *rktFactory) NewContainerHandler(name string, inHostNamespace bool) (
}

func (self *rktFactory) CanHandleAndAccept(name string) (bool, bool, error) {
// will ignore all cgroup names that don't either correspond to the machine.slice that is the pod or the containers that belong to the pod
// only works for machined rkt pods at the moment
accept, err := verifyPod(name)

if strings.HasPrefix(name, "/machine.slice/machine-rkt\\x2d") {
accept, err := verifyName(name)
return accept, accept, err
}
return false, false, fmt.Errorf("%s not handled by rkt handler", name)
return accept, accept, err
}

func (self *rktFactory) DebugInfo() map[string][]string {
Expand Down Expand Up @@ -100,6 +94,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno
ignoreMetrics: ignoreMetrics,
rktPath: rktPath,
}
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Rkt})
return nil
}
32 changes: 20 additions & 12 deletions container/rkt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,19 @@ func newRktContainerHandler(name string, rktClient rktapi.PublicAPIClient, rktPa
})
if err != nil {
return nil, err
} else {
var annotations []*rktapi.KeyValue
if parsed.Container == "" {
pid = int(resp.Pod.Pid)
apiPod = resp.Pod
annotations = resp.Pod.Annotations
}
annotations := resp.Pod.Annotations
if parsed.Container != "" { // As not empty string, an App container
if contAnnotations, ok := findAnnotations(resp.Pod.Apps, parsed.Container); !ok {
glog.Warningf("couldn't find app %v in pod", parsed.Container)
} else {
var ok bool
if annotations, ok = findAnnotations(resp.Pod.Apps, parsed.Container); !ok {
glog.Warningf("couldn't find application in Pod matching %v", parsed.Container)
}
annotations = append(annotations, contAnnotations...)
}
labels = createLabels(annotations)
} else { // The Pod container
pid = int(resp.Pod.Pid)
apiPod = resp.Pod
}
labels = createLabels(annotations)

cgroupPaths := common.MakeCgroupPaths(cgroupSubsystems.MountPoints, name)

Expand Down Expand Up @@ -195,7 +194,12 @@ func (handler *rktContainerHandler) Cleanup() {
func (handler *rktContainerHandler) GetSpec() (info.ContainerSpec, error) {
hasNetwork := handler.hasNetwork && !handler.ignoreMetrics.Has(container.NetworkUsageMetrics)
hasFilesystem := !handler.ignoreMetrics.Has(container.DiskUsageMetrics)
return common.GetSpec(handler.cgroupPaths, handler.machineInfoFactory, hasNetwork, hasFilesystem)

spec, err := common.GetSpec(handler.cgroupPaths, handler.machineInfoFactory, hasNetwork, hasFilesystem)

spec.Labels = handler.labels

return spec, err
}

func (handler *rktContainerHandler) getFsStats(stats *info.ContainerStats) error {
Expand Down Expand Up @@ -269,3 +273,7 @@ func (handler *rktContainerHandler) ListProcesses(listType container.ListType) (
func (handler *rktContainerHandler) Exists() bool {
return common.CgroupExists(handler.cgroupPaths)
}

func (handler *rktContainerHandler) Type() container.ContainerType {
return container.ContainerTypeRkt
}
86 changes: 67 additions & 19 deletions container/rkt/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,90 @@ import (
"path"
"strings"

rktapi "github.com/coreos/rkt/api/v1alpha"
"github.com/golang/glog"
"golang.org/x/net/context"
)

type parsedName struct {
Pod string
Container string
}

func verifyName(name string) (bool, error) {
_, err := parseName(name)
return err == nil, err
func verifyPod(name string) (bool, error) {
pod, err := cgroupToPod(name)

if err != nil || pod == nil {
return false, err
}

// Anything handler can handle is also accepted.
// Accept cgroups that are sub the pod cgroup, except "system.slice"
// - "system.slice" doesn't contain any processes itself
accept := !strings.HasSuffix(name, "/system.slice")

return accept, nil
}

func cgroupToPod(name string) (*rktapi.Pod, error) {
rktClient, err := Client()
if err != nil {
return nil, fmt.Errorf("couldn't get rkt api service: %v", err)
}

resp, err := rktClient.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filters: []*rktapi.PodFilter{
{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
PodSubCgroups: []string{name},
},
},
})

if err != nil {
return nil, fmt.Errorf("failed to list pods: %v", err)
}

if len(resp.Pods) == 0 {
return nil, nil
}

if len(resp.Pods) != 1 {
return nil, fmt.Errorf("returned %d (expected 1) pods for cgroup %v", len(resp.Pods), name)
}

return resp.Pods[0], nil
}

/* Parse cgroup name into a pod/container name struct
Example cgroup fs name
pod - /sys/fs/cgroup/cpu/machine.slice/machine-rkt\\x2df556b64a\\x2d17a7\\x2d47d7\\x2d93ec\\x2def2275c3d67e.scope/
container under pod - /sys/fs/cgroup/cpu/machine.slice/machine-rkt\\x2df556b64a\\x2d17a7\\x2d47d7\\x2d93ec\\x2def2275c3d67e.scope/system.slice/alpine-sh.service
pod - /machine.slice/machine-rkt\\x2df556b64a\\x2d17a7\\x2d47d7\\x2d93ec\\x2def2275c3d67e.scope/
or /system.slice/k8s-..../
container under pod - /machine.slice/machine-rkt\\x2df556b64a\\x2d17a7\\x2d47d7\\x2d93ec\\x2def2275c3d67e.scope/system.slice/alpine-sh.service
or /system.slice/k8s-..../system.slice/pause.service
*/
//TODO{sjpotter}: this currently only recognizes machined started pods, which actually doesn't help with k8s which uses them as systemd services, need a solution for both
func parseName(name string) (*parsedName, error) {
pod, err := cgroupToPod(name)
if err != nil {
return nil, fmt.Errorf("parseName: couldn't convert %v to a rkt pod: %v", name, err)
}
if pod == nil {
return nil, fmt.Errorf("parseName: didn't return a pod for %v", name)
}

splits := strings.Split(name, "/")

parsed := &parsedName{}

if len(splits) == 3 || len(splits) == 5 {
parsed := &parsedName{}

if splits[1] == "machine.slice" {
replacer := strings.NewReplacer("machine-rkt\\x2d", "", ".scope", "", "\\x2d", "-")
parsed.Pod = replacer.Replace(splits[2])
if len(splits) == 3 {
return parsed, nil
}
if splits[3] == "system.slice" {
parsed.Container = strings.Replace(splits[4], ".service", "", -1)
return parsed, nil
}
parsed.Pod = pod.Id

if len(splits) == 5 {
parsed.Container = strings.Replace(splits[4], ".service", "", -1)
}

return parsed, nil
}

return nil, fmt.Errorf("%s not handled by rkt handler", name)
Expand All @@ -80,7 +128,7 @@ func getRootFs(root string, parsed *parsedName) string {

bytes, err := ioutil.ReadFile(tree)
if err != nil {
glog.Infof("ReadFile failed, couldn't read %v to get upper dir: %v", tree, err)
glog.Errorf("ReadFile failed, couldn't read %v to get upper dir: %v", tree, err)
return ""
}

Expand Down
44 changes: 43 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/google/cadvisor/machine"
"github.com/google/cadvisor/manager/watcher"
rawwatcher "github.com/google/cadvisor/manager/watcher/raw"
rktwatcher "github.com/google/cadvisor/manager/watcher/rkt"
"github.com/google/cadvisor/utils/cpuload"
"github.com/google/cadvisor/utils/oomparser"
"github.com/google/cadvisor/utils/sysfs"
Expand Down Expand Up @@ -238,6 +239,12 @@ func (self *manager) Start() error {
err = rkt.Register(self, self.fsInfo, self.ignoreMetrics)
if err != nil {
glog.Errorf("Registration of the rkt container factory failed: %v", err)
} else {
watcher, err := rktwatcher.NewRktContainerWatcher()
if err != nil {
return err
}
self.containerWatchers = append(self.containerWatchers, watcher)
}

err = systemd.Register(self, self.fsInfo, self.ignoreMetrics)
Expand Down Expand Up @@ -783,6 +790,35 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
return nil
}

// Enables overwriting an existing containerData/Handler object for a given containerName.
// Can't use createContainer as it just returns if a given containerName has a handler already.
// Ex: rkt handler will want to take priority over the raw handler, but the raw handler might be created first.

// Only allow raw handler to be overridden
func (m *manager) overrideContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()

namespacedName := namespacedContainerName{
Name: containerName,
}

if _, ok := m.containers[namespacedName]; ok {
containerData := m.containers[namespacedName]

if containerData.handler.Type() != container.ContainerTypeRaw {
return nil
}

err := m.destroyContainerLocked(containerName)
if err != nil {
return fmt.Errorf("overrideContainer: failed to destroy containerData/handler for %v: %v", containerName, err)
}
}

return m.createContainerLocked(containerName, watchSource)
}

// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
Expand Down Expand Up @@ -1008,7 +1044,13 @@ func (self *manager) watchForNewContainers(quit chan error) error {
case event := <-self.eventsChannel:
switch {
case event.EventType == watcher.ContainerAdd:
err = self.createContainer(event.Name, event.WatchSource)
switch event.WatchSource {
// the Rkt and Raw watchers can race, and if Raw wins, we want Rkt to override and create a new handler for Rkt containers
case watcher.Rkt:
err = self.overrideContainer(event.Name, event.WatchSource)
default:
err = self.createContainer(event.Name, event.WatchSource)
}
case event.EventType == watcher.ContainerDelete:
err = self.destroyContainer(event.Name)
}
Expand Down
Loading

0 comments on commit 6fa3687

Please sign in to comment.