Skip to content

Commit 471b26c

Browse files
committed
run worker in launcher pod
1 parent d7a392e commit 471b26c

File tree

7 files changed

+168
-27
lines changed

7 files changed

+168
-27
lines changed

examples/v2beta1/pi/pi-intel.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 1Gi
30+
memory: 200m
3131
Worker:
3232
replicas: 2
3333
template:

examples/v2beta1/pi/pi-mpich.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 1Gi
30+
memory: 200m
3131
Worker:
3232
replicas: 2
3333
template:

examples/v2beta1/pi/pi.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
resources:
2828
limits:
2929
cpu: 1
30-
memory: 1Gi
30+
memory: 200m
3131
Worker:
3232
replicas: 2
3333
template:

examples/v2beta1/pi/pi2.yaml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
apiVersion: kubeflow.org/v2beta1
2+
kind: MPIJob
3+
metadata:
4+
name: pi2
5+
spec:
6+
slotsPerWorker: 1
7+
runPolicy:
8+
cleanPodPolicy: Running
9+
ttlSecondsAfterFinished: 60
10+
sshAuthMountPath: /home/mpiuser/.ssh
11+
mpiReplicaSpecs:
12+
Launcher:
13+
replicas: 1
14+
template:
15+
spec:
16+
containers:
17+
- image: mpioperator/mpi-pi:openmpi
18+
name: mpi-launcher
19+
securityContext:
20+
runAsUser: 1000
21+
command:
22+
- bash
23+
args:
24+
- -c
25+
- "/usr/sbin/sshd -f /home/mpiuser/.sshd_config && mpirun /home/mpiuser/pi"
26+
resources:
27+
limits:
28+
cpu: 1
29+
memory: 1Gi
30+
Worker:
31+
replicas: 2
32+
template:
33+
spec:
34+
containers:
35+
- image: mpioperator/mpi-pi:openmpi
36+
name: mpi-worker
37+
securityContext:
38+
runAsUser: 1000
39+
command:
40+
- /usr/sbin/sshd
41+
args:
42+
- -De
43+
- -f
44+
- /home/mpiuser/.sshd_config
45+
resources:
46+
limits:
47+
cpu: 1
48+
memory: 1Gi

pkg/controller/mpi_job_controller.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -628,15 +628,12 @@ func (c *MPIJobController) syncHandler(key string) error {
628628
return err
629629
}
630630
}
631-
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
632-
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
633-
// The Intel and MPICH implementations require workers to communicate with the
634-
// launcher through its hostname. For that, we create a Service which
635-
// has the same name as the launcher's hostname.
636-
_, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob))
637-
if err != nil {
638-
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
639-
}
631+
// NEW: always create service for launcher for different implementations and compatible with running launcher as worker
632+
// The Intel and MPICH implementations require workers to communicate with the
633+
// launcher through its hostname. For that, we create a Service which
634+
// has the same name as the launcher's hostname.
635+
if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil {
636+
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
640637
}
641638
if launcher == nil {
642639
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
@@ -1246,6 +1243,26 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
12461243
return err
12471244
}
12481245

1246+
// enableLauncherAsWorker check whether to run worker process in launcher
1247+
func enableLauncherAsWorker(mpiJob *kubeflow.MPIJob) bool {
1248+
// case 1: have no worker
1249+
worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
1250+
if worker == nil || len(worker.Template.Spec.Containers) < 1 {
1251+
return true
1252+
}
1253+
1254+
// case -1: no resource declaration for launcher
1255+
launcher := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]
1256+
launcherRes := launcher.Template.Spec.Containers[0].Resources
1257+
if launcherRes.Limits == nil && launcherRes.Requests == nil {
1258+
return false
1259+
}
1260+
1261+
// case 2: launcher declare the same resource as worker
1262+
workerRes := worker.Template.Spec.Containers[0].Resources
1263+
return equality.Semantic.DeepEqual(workerRes, launcherRes)
1264+
}
1265+
12491266
// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
12501267
// resource. It also sets the appropriate OwnerReferences on the resource so
12511268
// handleObject can discover the MPIJob resource that 'owns' it.
@@ -1256,12 +1273,26 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
12561273
if mpiJob.Spec.SlotsPerWorker != nil {
12571274
slots = int(*mpiJob.Spec.SlotsPerWorker)
12581275
}
1276+
// note that pod.spec.dnsConfig also affect the svc resolution
1277+
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
1278+
// launcher can be reach with hostname or service name
1279+
if enableLauncherAsWorker(mpiJob) {
1280+
launcherService := mpiJob.Name + launcherSuffix
1281+
switch mpiJob.Spec.MPIImplementation {
1282+
case kubeflow.MPIImplementationOpenMPI:
1283+
buffer.WriteString(fmt.Sprintf("%s.%s.svc slots=%d\n", launcherService, mpiJob.Namespace, slots))
1284+
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
1285+
buffer.WriteString(fmt.Sprintf("%s.%s.svc:%d\n", launcherService, mpiJob.Namespace, slots))
1286+
}
1287+
}
1288+
12591289
for i := 0; i < int(workerReplicas); i++ {
1290+
name := workerName(mpiJob, i)
12601291
switch mpiJob.Spec.MPIImplementation {
12611292
case kubeflow.MPIImplementationOpenMPI:
1262-
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
1293+
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, workersService, mpiJob.Namespace, slots))
12631294
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
1264-
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
1295+
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, workersService, mpiJob.Namespace, slots))
12651296
}
12661297
}
12671298

pkg/controller/mpi_job_controller_test.go

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
batchv1 "k8s.io/api/batch/v1"
2626
corev1 "k8s.io/api/core/v1"
2727
schedulingv1 "k8s.io/api/scheduling/v1"
28+
"k8s.io/apimachinery/pkg/api/resource"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -526,10 +527,7 @@ func TestAllResourcesCreated(t *testing.T) {
526527
for i := 0; i < 5; i++ {
527528
f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i))
528529
}
529-
if implementation == kubeflow.MPIImplementationIntel ||
530-
implementation == kubeflow.MPIImplementationMPICH {
531-
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
532-
}
530+
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
533531
f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy))
534532

535533
mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")}
@@ -822,10 +820,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
822820
t.Fatalf("Failed creating secret")
823821
}
824822
f.expectCreateSecretAction(secret)
825-
if implementation == kubeflow.MPIImplementationIntel ||
826-
implementation == kubeflow.MPIImplementationMPICH {
827-
f.expectCreateServiceAction(newLauncherService(mpiJob))
828-
}
823+
f.expectCreateServiceAction(newLauncherService(mpiJob))
829824

830825
// expect creating of the launcher
831826
fmjc := f.newFakeMPIJobController()
@@ -887,6 +882,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) {
887882

888883
// setup objects
889884
scheme.Scheme.Default(mpiJob)
885+
f.setUpService(newLauncherService(mpiJob))
890886
f.setUpService(newWorkersService(mpiJob))
891887

892888
cfgMap := newConfigMap(mpiJob, replicas)
@@ -991,6 +987,7 @@ func TestResumeMPIJob(t *testing.T) {
991987
// expect the launcher update to resume it
992988
launcherCopy := launcher.DeepCopy()
993989
launcherCopy.Spec.Suspend = pointer.Bool(false)
990+
f.expectCreateServiceAction(newLauncherService(mpiJob))
994991
f.expectUpdateJobAction(launcherCopy)
995992

996993
// expect an update to add the conditions
@@ -1047,6 +1044,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) {
10471044
configMap := newConfigMap(mpiJobCopy, replicas)
10481045
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
10491046
f.setUpConfigMap(configMap)
1047+
f.setUpService(newLauncherService(mpiJob))
10501048
f.setUpService(newWorkersService(mpiJobCopy))
10511049
secret, err := newSSHAuthSecret(mpiJobCopy)
10521050
if err != nil {
@@ -1097,6 +1095,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) {
10971095

10981096
mpiJobCopy := mpiJob.DeepCopy()
10991097
scheme.Scheme.Default(mpiJobCopy)
1098+
f.setUpService(newLauncherService(mpiJob))
11001099
f.setUpService(newWorkersService(mpiJobCopy))
11011100
secret, err := newSSHAuthSecret(mpiJobCopy)
11021101
if err != nil {
@@ -1156,6 +1155,7 @@ func TestWorkerReady(t *testing.T) {
11561155

11571156
mpiJobCopy := mpiJob.DeepCopy()
11581157
scheme.Scheme.Default(mpiJobCopy)
1158+
f.setUpService(newLauncherService(mpiJob))
11591159
f.setUpService(newWorkersService(mpiJobCopy))
11601160
secret, err := newSSHAuthSecret(mpiJobCopy)
11611161
if err != nil {
@@ -1532,20 +1532,71 @@ func TestNewLauncherAndWorker(t *testing.T) {
15321532
}
15331533
}
15341534

1535+
func newReplicaSpec(name string, cpu string) *kubeflow.ReplicaSpec {
1536+
return &kubeflow.ReplicaSpec{
1537+
Template: corev1.PodTemplateSpec{
1538+
Spec: corev1.PodSpec{
1539+
Containers: []corev1.Container{
1540+
{
1541+
Name: name,
1542+
Resources: corev1.ResourceRequirements{
1543+
Limits: corev1.ResourceList{
1544+
corev1.ResourceCPU: resource.MustParse(cpu),
1545+
},
1546+
},
1547+
},
1548+
},
1549+
},
1550+
},
1551+
}
1552+
}
1553+
15351554
func TestNewConfigMap(t *testing.T) {
15361555
testCases := map[string]struct {
15371556
mpiJob *kubeflow.MPIJob
15381557
workerReplicas int32
15391558
wantCM *corev1.ConfigMap
15401559
}{
1541-
"OpenMPI without slots": {
1560+
"OpenMPI without slots, enable launcher as worker": {
15421561
mpiJob: &kubeflow.MPIJob{
15431562
ObjectMeta: metav1.ObjectMeta{
15441563
Name: "openmpi-without-slots",
15451564
Namespace: "tenant-a",
15461565
},
15471566
Spec: kubeflow.MPIJobSpec{
15481567
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1568+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1569+
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "2"),
1570+
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1571+
},
1572+
},
1573+
},
1574+
workerReplicas: 2,
1575+
wantCM: &corev1.ConfigMap{
1576+
ObjectMeta: metav1.ObjectMeta{
1577+
Name: "openmpi-without-slots-config",
1578+
Namespace: "tenant-a",
1579+
Labels: map[string]string{
1580+
"app": "openmpi-without-slots",
1581+
},
1582+
},
1583+
Data: map[string]string{
1584+
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n",
1585+
},
1586+
},
1587+
},
1588+
"OpenMPI without slots, disable launcher as worker": {
1589+
mpiJob: &kubeflow.MPIJob{
1590+
ObjectMeta: metav1.ObjectMeta{
1591+
Name: "openmpi-without-slots",
1592+
Namespace: "tenant-a",
1593+
},
1594+
Spec: kubeflow.MPIJobSpec{
1595+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1596+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1597+
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1598+
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1599+
},
15491600
},
15501601
},
15511602
workerReplicas: 2,
@@ -1571,6 +1622,10 @@ func TestNewConfigMap(t *testing.T) {
15711622
Spec: kubeflow.MPIJobSpec{
15721623
SlotsPerWorker: pointer.Int32(10),
15731624
MPIImplementation: kubeflow.MPIImplementationIntel,
1625+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1626+
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1627+
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1628+
},
15741629
},
15751630
},
15761631
workerReplicas: 1,
@@ -1596,6 +1651,10 @@ func TestNewConfigMap(t *testing.T) {
15961651
Spec: kubeflow.MPIJobSpec{
15971652
SlotsPerWorker: pointer.Int32(10),
15981653
MPIImplementation: kubeflow.MPIImplementationMPICH,
1654+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
1655+
kubeflow.MPIReplicaTypeLauncher: newReplicaSpec("launcher", "1"),
1656+
kubeflow.MPIReplicaTypeWorker: newReplicaSpec("worker", "2"),
1657+
},
15991658
},
16001659
},
16011660
workerReplicas: 1,

test/integration/mpi_job_controller_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package integration
1717
import (
1818
"context"
1919
"fmt"
20+
"strings"
2021
"testing"
2122
"time"
2223

@@ -895,7 +896,7 @@ func validateMPIJobDependencies(
895896
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
896897
problems = nil
897898
var err error
898-
svc, err = getServiceForJob(ctx, kubeClient, job)
899+
svc, err = getServiceForJob(ctx, kubeClient, job, "worker")
899900
if err != nil {
900901
return false, err
901902
}
@@ -1026,14 +1027,16 @@ func updatePodsCondition(ctx context.Context, client kubernetes.Interface, pods
10261027
return nil
10271028
}
10281029

1029-
func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob) (*corev1.Service, error) {
1030+
func getServiceForJob(ctx context.Context, client kubernetes.Interface, job *kubeflow.MPIJob, role string) (*corev1.Service, error) {
10301031
result, err := client.CoreV1().Services(job.Namespace).List(ctx, metav1.ListOptions{})
10311032
if err != nil {
10321033
return nil, err
10331034
}
10341035
for _, obj := range result.Items {
1035-
if metav1.IsControlledBy(&obj, job) {
1036-
return &obj, nil
1036+
if strings.HasSuffix(obj.Name, role) {
1037+
if metav1.IsControlledBy(&obj, job) {
1038+
return &obj, nil
1039+
}
10371040
}
10381041
}
10391042
return nil, nil

0 commit comments

Comments
 (0)