diff --git a/src/cm/container_manager_linux.go b/src/cm/container_manager_linux.go index f2b1f7922..743f98e48 100644 --- a/src/cm/container_manager_linux.go +++ b/src/cm/container_manager_linux.go @@ -31,7 +31,6 @@ import ( utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - kubecm "k8s.io/kubernetes/pkg/kubelet/cm" "github.com/Mirantis/cri-dockerd/libdocker" ) @@ -98,7 +97,7 @@ func (m *containerManager) doWork() { // EnsureDockerInContainer does two things. // 1. Ensure processes run in the cgroups if m.cgroupsManager is not nil. // 2. Ensure processes have the OOM score applied. - if err := kubecm.EnsureDockerInContainer(version, dockerOOMScoreAdj, m.cgroupsManager); err != nil { + if err := m.ensureDockerInContainer(version, dockerOOMScoreAdj, m.cgroupsManager); err != nil { klog.ErrorS(err, "Unable to ensure the docker processes run in the desired containers") } } diff --git a/src/cm/helpers.go b/src/cm/helpers.go new file mode 100644 index 000000000..8fe45d56b --- /dev/null +++ b/src/cm/helpers.go @@ -0,0 +1 @@ +package cm \ No newline at end of file diff --git a/src/cm/helpers_linux.go b/src/cm/helpers_linux.go new file mode 100644 index 000000000..9dac07f68 --- /dev/null +++ b/src/cm/helpers_linux.go @@ -0,0 +1,211 @@ +/* +Copyright 2021 Mirantis + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "fmt" + "github.com/opencontainers/runc/libcontainer/cgroups" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/procfs" + "os" + "strconv" +) + +func isProcessRunningInHost(pid int) (bool, error) { + // Get init pid namespace. + initPidNs, err := os.Readlink("/proc/1/ns/pid") + if err != nil { + return false, fmt.Errorf("failed to find pid namespace of init process") + } + klog.V(10).Infof("init pid ns is %q", initPidNs) + processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid)) + if err != nil { + return false, fmt.Errorf("failed to find pid namespace of process %q", pid) + } + klog.V(10).Infof("Pid %d pid ns is %q", pid, processPidNs) + return initPidNs == processPidNs, nil +} + +func getContainer(pid int) (string, error) { + cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + + if cgroups.IsCgroup2UnifiedMode() { + c, found := cgs[""] + if !found { + return "", cgroups.NewNotFoundError("unified") + } + return c, nil + } + + cpu, found := cgs["cpu"] + if !found { + return "", cgroups.NewNotFoundError("cpu") + } + memory, found := cgs["memory"] + if !found { + return "", cgroups.NewNotFoundError("memory") + } + + // since we use this container for accounting, we need to ensure its a unified hierarchy. + if cpu != memory { + return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory) + } + + // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls) + // cpu and memory accounting is off by default, users may choose to enable it per unit or globally. + // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true). + // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true + // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal. + // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory + // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers. + // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet. + // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally). + if systemd, found := cgs["name=systemd"]; found { + if systemd != cpu { + klog.Warningf("CPUAccounting not enabled for pid: %d", pid) + } + if systemd != memory { + klog.Warningf("MemoryAccounting not enabled for pid: %d", pid) + } + return systemd, nil + } + + return cpu, nil +} + +func (m *containerManager) ensureDockerInContainer(dockerAPIVersion *utilversion.Version, oomScoreAdj int, manager cgroups.Manager) error { + type process struct{ name, file string } + dockerProcs := []process{{ + "dockerd", + "/var/run/docker.pid", + + }} + + containerdAPIVersion := utilversion.MustParseGeneric("1.23") + if dockerAPIVersion.AtLeast(containerdAPIVersion) { + dockerProcs = append(dockerProcs, process{"containerd", ""}) + } + var errs []error + for _, proc := range dockerProcs { + pids, err := getPidsForProcess(proc.name, proc.file) + if err != nil { + errs = append(errs, fmt.Errorf("failed to get pids for %q: %v", proc.name, err)) + continue + } + + // Move if the pid is not already in the desired container. + for _, pid := range pids { + if err := ensureProcessInContainerWithOOMScore(pid, oomScoreAdj, manager); err != nil { + errs = append(errs, fmt.Errorf("errors moving %q pid: %v", proc.name, err)) + } + } + } + return utilerrors.NewAggregate(errs) +} + +func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error { + if runningInHost, err := isProcessRunningInHost(pid); err != nil { + // Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context. + return err + } else if !runningInHost { + // Process is running inside a container. Don't touch that. + klog.V(2).Infof("pid %d is not running in the host namespaces", pid) + return nil + } + + var errs []error + if manager != nil { + cont, err := getContainer(pid) + if err != nil { + errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err)) + } + + name := "" + cgroups, err := manager.GetCgroups() + if err != nil { + errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err)) + } else { + name = cgroups.Name + } + + if cont != name { + err = manager.Apply(pid) + if err != nil { + errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err)) + } + } + } + + // Also apply oom-score-adj to processes + oomAdjuster := oom.NewOOMAdjuster() + klog.V(5).Infof("attempting to apply oom_score_adj of %d to pid %d", oomScoreAdj, pid) + if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil { + klog.V(3).Infof("Failed to apply oom_score_adj %d for pid %d: %v", oomScoreAdj, pid, err) + errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err)) + } + return utilerrors.NewAggregate(errs) +} + +func getPidFromPidFile(pidFile string) (int, error) { + file, err := os.Open(pidFile) + if err != nil { + return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err) + } + defer file.Close() + + data := make([]byte, 1024) + + _, err = file.Read(data) + if err != nil { + return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err) + } + + pid, err := strconv.Atoi(string(data)) + if err != nil { + return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err) + } + + return pid, nil +} + +func getPidsForProcess(name, pidFile string) ([]int, error) { + if len(pidFile) == 0 { + return procfs.PidOf(name) + } + + pid, err := getPidFromPidFile(pidFile) + if err == nil { + return []int{pid}, nil + } + + // Try to lookup pid by process name + pids, err2 := procfs.PidOf(name) + if err2 == nil { + return pids, nil + } + + // Return error from getPidFromPidFile since that should have worked + // and is the real source of the problem. + klog.V(4).Infof("unable to get pid from %s: %v", pidFile, err) + return []int{}, err +}