Skip to content

Commit 9dfb551

Browse files
committed
add RunLauncherAsWorker in spec
1 parent 471b26c commit 9dfb551

File tree

6 files changed

+19
-61
lines changed

6 files changed

+19
-61
lines changed

examples/v2beta1/pi/pi-intel.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 200m
30+
memory: 1Gi
3131
Worker:
3232
replicas: 2
3333
template:

examples/v2beta1/pi/pi-mpich.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 200m
30+
memory: 1Gi
3131
Worker:
3232
replicas: 2
3333
template:

examples/v2beta1/pi/pi.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 200m
30+
memory: 1Gi
3131
Worker:
3232
replicas: 2
3333
template:

pkg/apis/kubeflow/v2beta1/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ type MPIJobSpec struct {
154154
// +kubebuilder:default:=1
155155
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`
156156

157+
// RunLauncherAsWorker indicate wether to run worker process in launcher
158+
// Defaults to false.
159+
// +optional
160+
// +kubebuilder:default:=false
161+
RunLauncherAsWorker bool `json:"runLauncherAsWorker,omitempty"`
162+
157163
// RunPolicy encapsulates various runtime policies of the job.
158164
RunPolicy RunPolicy `json:"runPolicy,omitempty"`
159165

pkg/controller/mpi_job_controller.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,26 +1243,6 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
12431243
return err
12441244
}
12451245

1246-
// enableLauncherAsWorker check whether to run worker process in launcher
1247-
func enableLauncherAsWorker(mpiJob *kubeflow.MPIJob) bool {
1248-
// case 1: have no worker
1249-
worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
1250-
if worker == nil || len(worker.Template.Spec.Containers) < 1 {
1251-
return true
1252-
}
1253-
1254-
// case -1: no resource declaration for launcher
1255-
launcher := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]
1256-
launcherRes := launcher.Template.Spec.Containers[0].Resources
1257-
if launcherRes.Limits == nil && launcherRes.Requests == nil {
1258-
return false
1259-
}
1260-
1261-
// case 2: launcher declare the same resource as worker
1262-
workerRes := worker.Template.Spec.Containers[0].Resources
1263-
return equality.Semantic.DeepEqual(workerRes, launcherRes)
1264-
}
1265-
12661246
// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
12671247
// resource. It also sets the appropriate OwnerReferences on the resource so
12681248
// handleObject can discover the MPIJob resource that 'owns' it.
@@ -1276,7 +1256,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
12761256
// note that pod.spec.dnsConfig also affect the svc resolution
12771257
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
12781258
// launcher can be reach with hostname or service name
1279-
if enableLauncherAsWorker(mpiJob) {
1259+
if mpiJob.Spec.RunLauncherAsWorker {
12801260
launcherService := mpiJob.Name + launcherSuffix
12811261
switch mpiJob.Spec.MPIImplementation {
12821262
case kubeflow.MPIImplementationOpenMPI:
@@ -1322,6 +1302,13 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo
13221302

13231303
var buffer bytes.Buffer
13241304
buffer.WriteString("#!/bin/sh\n")
1305+
1306+
// We donnot check if launcher is running here, launcher should always be there or the job failed
1307+
if mpiJob.Spec.RunLauncherAsWorker {
1308+
launcherService := mpiJob.Name + launcherSuffix
1309+
buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace))
1310+
}
1311+
13251312
workersService := mpiJob.Name + workerSuffix
13261313
for _, p := range runningPods {
13271314
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))

pkg/controller/mpi_job_controller_test.go

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
batchv1 "k8s.io/api/batch/v1"
2626
corev1 "k8s.io/api/core/v1"
2727
schedulingv1 "k8s.io/api/scheduling/v1"
28-
"k8s.io/apimachinery/pkg/api/resource"
2928
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3029
"k8s.io/apimachinery/pkg/runtime"
3130
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -1532,25 +1531,6 @@ func TestNewLauncherAndWorker(t *testing.T) {
15321531
}
15331532
}
15341533

1535-
func newReplicaSpec(name string, cpu string) *kubeflow.ReplicaSpec {
1536-
return &kubeflow.ReplicaSpec{
1537-
Template: corev1.PodTemplateSpec{
1538-
Spec: corev1.PodSpec{
1539-
Containers: []corev1.Container{
1540-
{
1541-
Name: name,
1542-
Resources: corev1.ResourceRequirements{
1543-
Limits: corev1.ResourceList{
1544-
corev1.ResourceCPU: resource.MustParse(cpu),
1545-
},
1546-
},
1547-
},
1548-
},
1549-
},
1550-
},
1551-
}
1552-
}
1553-
15541534
func TestNewConfigMap(t *testing.T) {
15551535
testCases := map[string]struct {
15561536
mpiJob *kubeflow.MPIJob
@@ -1564,11 +1544,8 @@ func TestNewConfigMap(t *testing.T) {
15641544
Namespace: "tenant-a",
15651545
},
15661546
Spec: kubeflow.MPIJobSpec{
1567-
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1568-
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1569-
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "2"),
1570-
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1571-
},
1547+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1548+
RunLauncherAsWorker: true,
15721549
},
15731550
},
15741551
workerReplicas: 2,
@@ -1593,10 +1570,6 @@ func TestNewConfigMap(t *testing.T) {
15931570
},
15941571
Spec: kubeflow.MPIJobSpec{
15951572
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1596-
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1597-
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1598-
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1599-
},
16001573
},
16011574
},
16021575
workerReplicas: 2,
@@ -1622,10 +1595,6 @@ func TestNewConfigMap(t *testing.T) {
16221595
Spec: kubeflow.MPIJobSpec{
16231596
SlotsPerWorker: pointer.Int32(10),
16241597
MPIImplementation: kubeflow.MPIImplementationIntel,
1625-
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1626-
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1627-
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1628-
},
16291598
},
16301599
},
16311600
workerReplicas: 1,
@@ -1651,10 +1620,6 @@ func TestNewConfigMap(t *testing.T) {
16511620
Spec: kubeflow.MPIJobSpec{
16521621
SlotsPerWorker: pointer.Int32(10),
16531622
MPIImplementation: kubeflow.MPIImplementationMPICH,
1654-
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1655-
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1656-
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1657-
},
16581623
},
16591624
},
16601625
workerReplicas: 1,

0 commit comments

Comments
 (0)