Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

naming: change mutual cpus to shared cpus #17

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
glog.Fatalf("%v", err)
}

dp, err := deviceplugin.New(args.MutualCPUs)
dp, err := deviceplugin.New(args.SharedCPUs)
if err != nil {
glog.Fatalf("%v", err)
}
Expand All @@ -46,7 +46,7 @@ func parseArgs() *nriplugin.Args {
args := &nriplugin.Args{}
flag.StringVar(&args.PluginName, "name", "", "plugin name to register to NRI")
flag.StringVar(&args.PluginIdx, "idx", "", "plugin index to register to NRI")
flag.StringVar(&args.MutualCPUs, "mutual-cpus", "", "mutual cpus list")
flag.StringVar(&args.SharedCPUs, "shared-cpus", "", "shared cpus list")
flag.Parse()
return args
}
Expand Down
2 changes: 1 addition & 1 deletion deployment/kustomize/base/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
args:
- --name=mixedcpus
- --idx=99
- --mutual-cpus=0
- --shared-cpus=0
- --v=4
- --alsologtostderr
resources:
Expand Down
26 changes: 13 additions & 13 deletions pkg/deviceplugin/deviceplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,37 @@ import (
)

const (
MutualCPUResourceNamespace = "openshift.io"
MutualCPUResourceName = "mutualcpu"
MutualCPUDeviceName = MutualCPUResourceNamespace + "/" + MutualCPUResourceName
EnvVarName = "OPENSHIFT_MUTUAL_CPUS"
SharedCPUResourceNamespace = "openshift.io"
SharedCPUResourceName = "sharedcpu"
SharedCPUDeviceName = SharedCPUResourceNamespace + "/" + SharedCPUResourceName
EnvVarName = "OPENSHIFT_SHARED_CPUS"
)

type MutualCpu struct {
type SharedCpu struct {
cpus cpuset.CPUSet
}

func (mc *MutualCpu) GetResourceNamespace() string {
return MutualCPUResourceNamespace
func (mc *SharedCpu) GetResourceNamespace() string {
return SharedCPUResourceNamespace
}

func (mc *MutualCpu) Discover(pnl chan dpm.PluginNameList) {
pnl <- []string{MutualCPUResourceName}
func (mc *SharedCpu) Discover(pnl chan dpm.PluginNameList) {
pnl <- []string{SharedCPUResourceName}
}

func (mc *MutualCpu) NewPlugin(s string) dpm.PluginInterface {
func (mc *SharedCpu) NewPlugin(s string) dpm.PluginInterface {
return pluginImp{
mutualCpus: &mc.cpus,
sharedCpus: &mc.cpus,
update: make(chan message),
}
}

func New(cpus string) (*dpm.Manager, error) {
mutualCpus, err := cpuset.Parse(cpus)
sharedCpus, err := cpuset.Parse(cpus)
if err != nil {
return nil, err
}
mc := &MutualCpu{cpus: mutualCpus}
mc := &SharedCpu{cpus: sharedCpus}
return dpm.NewManager(mc), nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/deviceplugin/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type message struct {
}

type pluginImp struct {
mutualCpus *cpuset.CPUSet
sharedCpus *cpuset.CPUSet
update chan message
allocatedDevices int
}
Expand All @@ -61,7 +61,7 @@ func (p pluginImp) ListAndWatch(empty *pluginapi.Empty, server pluginapi.DeviceP
u := <-p.update
p.allocatedDevices += u.requestedDevices
if p.allocatedDevices > devicesLimit {
glog.V(2).Infof("Warning: device limit has reached. can not populate more %q makeDevices", MutualCPUDeviceName)
glog.V(2).Infof("Warning: device limit has reached. can not populate more %q makeDevices", SharedCPUDeviceName)
continue
}
// check if more makeDevices are needed
Expand All @@ -88,7 +88,7 @@ func (p pluginImp) Allocate(ctx context.Context, request *pluginapi.AllocateRequ
glog.V(4).Infof("Allocate called with %+v", request)
for range request.ContainerRequests {
containerResponse := &pluginapi.ContainerAllocateResponse{
Envs: map[string]string{"OPENSHIFT_MUTUAL_CPUS": p.mutualCpus.String()},
Envs: map[string]string{"OPENSHIFT_SHARED_CPUS": p.sharedCpus.String()},
}
response.ContainerResponses = append(response.ContainerResponses, containerResponse)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/manifests/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (mf *Manifests) ToObjects() []client.Object {
}

// SetSharedCPUs updates the container args under the
// DaemonSet with a --mutual-cpus value.
// DaemonSet with a --shared-cpus value.
// It returns an error if the cpus are not a valid cpu set.
func (mf *Manifests) SetSharedCPUs(cpus string) error {
set, err := cpuset.Parse(cpus)
Expand All @@ -150,12 +150,12 @@ func (mf *Manifests) SetSharedCPUs(cpus string) error {
var newArgs []string
for _, arg := range cnt.Args {
keyAndValue := strings.Split(arg, "=")
if keyAndValue[0] == "--mutual-cpus" {
if keyAndValue[0] == "--shared-cpus" {
continue
}
newArgs = append(newArgs, arg)
}
newArgs = append(newArgs, fmt.Sprintf("--mutual-cpus=%s", set.String()))
newArgs = append(newArgs, fmt.Sprintf("--shared-cpus=%s", set.String()))
cnt.Args = newArgs
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/manifests/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestSetSharedCPUs(t *testing.T) {
cnt := &mf.DS.Spec.Template.Spec.Containers[0]
for _, arg := range cnt.Args {
keyAndValue := strings.Split(arg, "=")
if keyAndValue[0] == "--mutual-cpus" {
if keyAndValue[0] == "--shared-cpus" {
// we know the format is correct, otherwise Get() would return with an error
gotCPUset, _ = cpuset.Parse(keyAndValue[1])
break
Expand Down
36 changes: 18 additions & 18 deletions pkg/nriplugin/nriplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ const (
// Plugin nriplugin for mixed cpus
type Plugin struct {
Stub stub.Stub
MutualCPUs *cpuset.CPUSet
SharedCPUs *cpuset.CPUSet
}

type Args struct {
PluginName string
PluginIdx string
MutualCPUs string
SharedCPUs string
}

func New(args *Args) (*Plugin, error) {
Expand All @@ -56,15 +56,15 @@ func New(args *Args) (*Plugin, error) {
if args.PluginIdx != "" {
opts = append(opts, stub.WithPluginIdx(args.PluginIdx))
}
c, err := cpuset.Parse(args.MutualCPUs)
c, err := cpuset.Parse(args.SharedCPUs)
if err != nil {
return nil, fmt.Errorf("failed to parse cpuset %q: %w", args.MutualCPUs, err)
return nil, fmt.Errorf("failed to parse cpuset %q: %w", args.SharedCPUs, err)
}
if c.Size() == 0 {
return p, fmt.Errorf("there has to be at least one mutual CPU")
return p, fmt.Errorf("there has to be at least one shared CPU")
}
glog.Infof("node %q mutual CPUs: %q", os.ExpandEnv("$NODE_NAME"), c.String())
p.MutualCPUs = &c
glog.Infof("node %q shared CPUs: %q", os.ExpandEnv("$NODE_NAME"), c.String())
p.SharedCPUs = &c

if p.Stub, err = stub.New(p, opts...); err != nil {
return nil, fmt.Errorf("failed to create plugin stub: %w", err)
Expand All @@ -81,18 +81,18 @@ func (p *Plugin) CreateContainer(pod *api.PodSandbox, ctr *api.Container) (*api.
return adjustment, updates, nil
}
uniqueName := getCtrUniqueName(pod, ctr)
glog.Infof("append mutual cpus to container %q", uniqueName)
err := setMutualCPUs(ctr, p.MutualCPUs, uniqueName)
glog.Infof("append shared cpus to container %q", uniqueName)
err := setSharedCPUs(ctr, p.SharedCPUs, uniqueName)
if err != nil {
return adjustment, updates, fmt.Errorf("CreateContainer: setMutualCPUs failed: %w", err)
return adjustment, updates, fmt.Errorf("CreateContainer: setSharedCPUs failed: %w", err)
}

//Adding mutual cpus without increasing cpuQuota,
//Adding shared cpus without increasing cpuQuota,
//might result with throttling the processes' threads
//if the threads that are running under the mutual cpus
//if the threads that are running under the shared cpus
//oversteps their boundaries, or the threads that are running
//under the reserved cpus consumes the cpuQuota (pretty common in dpdk/latency sensitive applications).
//Since we can't determine the cpuQuota for the mutual cpus
//Since we can't determine the cpuQuota for the shared cpus
//and avoid throttling the process is critical, increasing the cpuQuota to the maximum is the best option.
quota, err := calculateCFSQuota(ctr)
if err != nil {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (p *Plugin) UpdateContainer(pod *api.PodSandbox, ctr *api.Container) ([]*ap
return nil, fmt.Errorf("failed to parse container %q cpuset %w", ctr.Id, err)
}
// bypass updates coming from CPUManager
ctr.Linux.Resources.Cpu.Cpus = curCpus.Union(*p.MutualCPUs).String()
ctr.Linux.Resources.Cpu.Cpus = curCpus.Union(*p.SharedCPUs).String()
quota, err := calculateCFSQuota(ctr)
if err != nil {
return nil, fmt.Errorf("failed to calculate CFS quota: %w", err)
Expand All @@ -167,7 +167,7 @@ func (p *Plugin) UpdateContainer(pod *api.PodSandbox, ctr *api.Container) ([]*ap
return updates, nil
}

func setMutualCPUs(ctr *api.Container, mutualCPUs *cpuset.CPUSet, uniqueName string) error {
func setSharedCPUs(ctr *api.Container, sharedCPUs *cpuset.CPUSet, uniqueName string) error {
lspec := ctr.GetLinux()
if lspec == nil ||
lspec.Resources == nil ||
Expand All @@ -177,13 +177,13 @@ func setMutualCPUs(ctr *api.Container, mutualCPUs *cpuset.CPUSet, uniqueName str
}
ctrCpus := lspec.Resources.Cpu
curCpus, err := cpuset.Parse(ctrCpus.Cpus)
glog.V(4).Infof("container %q cpus ids before applying mutual cpus %q", uniqueName, curCpus.String())
glog.V(4).Infof("container %q cpus ids before applying shared cpus %q", uniqueName, curCpus.String())
if err != nil {
return err
}

ctrCpus.Cpus = curCpus.Union(*mutualCPUs).String()
glog.V(4).Infof("container %q cpus ids after applying mutual cpus %q", uniqueName, ctrCpus.Cpus)
ctrCpus.Cpus = curCpus.Union(*sharedCPUs).String()
glog.V(4).Infof("container %q cpus ids after applying shared cpus %q", uniqueName, ctrCpus.Cpus)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/nriplugin/nriplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
func TestCreateContainer(t *testing.T) {
testCases := []struct {
name string
mutualCPUs cpuset.CPUSet
sharedCPUs cpuset.CPUSet
sb *api.PodSandbox
ctr *api.Container
lres *api.LinuxResources
Expand All @@ -44,7 +44,7 @@ func TestCreateContainer(t *testing.T) {
}{
{
name: "pod without annotation",
mutualCPUs: e2ecpuset.MustParse(sampleCPUs),
sharedCPUs: e2ecpuset.MustParse(sampleCPUs),
sb: makePodSandbox("test-sb"),
ctr: makeContainer("test-ctr", withLinuxResources("1,2", 20000)),
lres: nil,
Expand All @@ -57,7 +57,7 @@ func TestCreateContainer(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := &Plugin{
Stub: nil,
MutualCPUs: &tc.mutualCPUs,
SharedCPUs: &tc.sharedCPUs,
}
ca, _, err := p.CreateContainer(tc.sb, tc.ctr)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions test/e2e/mixedcpus/mixedcpus.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ var _ = Describe("Mixedcpus", func() {
It("should generate more devices", func() {
By("create deployment which asks more devices than the node has")
pod := pods.Make("pod-test", fxt.NS.Name, pods.WithLimits(corev1.ResourceList{
deviceplugin.MutualCPUDeviceName: resource.MustParse("1"),
deviceplugin.SharedCPUDeviceName: resource.MustParse("1"),
}))
workers, err := nodes.GetWorkers(fxt.Ctx, fxt.Cli)
Expect(err).ToNot(HaveOccurred())
var devicesCap resource.Quantity
for _, worker := range workers {
devicesPerWorker := worker.Status.Capacity.Name(deviceplugin.MutualCPUDeviceName, resource.DecimalSI)
devicesPerWorker := worker.Status.Capacity.Name(deviceplugin.SharedCPUDeviceName, resource.DecimalSI)
devicesCap.Add(*devicesPerWorker)
}
// we want to make sure we exhaust all devices in the cluster,
Expand Down Expand Up @@ -127,14 +127,14 @@ var _ = Describe("Mixedcpus", func() {
Expect(intersect.Equals(sharedCpusSet)).To(BeTrue(), "shared cpu ids: %s, are not presented. pod: %v cpu ids are: %s", sharedCpusSet.String(), fmt.Sprintf("%s/%s", pod2.Namespace, pod2.Name), cpus.String())
})

It("should contain OPENSHIFT_MUTUAL_CPUS environment variable", func() {
It("should contain OPENSHIFT_SHARED_CPUS environment variable", func() {
sharedCpus := e2econfig.SharedCPUs()
Expect(sharedCpus).ToNot(BeEmpty())

sharedCpusSet := e2ecpuset.MustParse(sharedCpus)
out, err := pods.Exec(fxt.K8SCli, pod, []string{"/bin/printenv", "OPENSHIFT_MUTUAL_CPUS"})
out, err := pods.Exec(fxt.K8SCli, pod, []string{"/bin/printenv", "OPENSHIFT_SHARED_CPUS"})
Expect(err).ToNot(HaveOccurred())
Expect(out).ToNot(BeEmpty(), "OPENSHIFT_MUTUAL_CPUS environment variable was not found")
Expect(out).ToNot(BeEmpty(), "OPENSHIFT_SHARED_CPUS environment variable was not found")

envVar := strings.Trim(string(out), "\r\n")
sharedCpusFromEnv, err := cpuset.Parse(envVar)
Expand Down Expand Up @@ -209,7 +209,7 @@ func createDeployment(cli client.Client, ns, name string) *appsv1.Deployment {
pod := pods.Make("pod-test", ns, pods.WithLimits(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("100M"),
deviceplugin.MutualCPUDeviceName: resource.MustParse("1"),
deviceplugin.SharedCPUDeviceName: resource.MustParse("1"),
}))
dp := deployments.Make(name, ns, deployments.WithPodSpec(pod.Spec))
klog.Infof("create deployment %q with a pod requesting for shared cpus", client.ObjectKeyFromObject(dp).String())
Expand Down
Loading