Skip to content

Commit

Permalink
Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Salway <jacob.salway@gmail.com>
  • Loading branch information
jacobsalway committed Aug 2, 2024
1 parent d8122b6 commit e7d9cc5
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ func start() {
if enableBatchScheduler {
registry = scheduler.GetRegistry()

// Register volcano scheduler.
// Register Volcano and Yunikorn schedulers.
registry.Register(common.VolcanoSchedulerName, volcano.Factory)
// TODO(jacobsalway): enable once the integration is ready
//registry.Register(yunikorn.SchedulerName, yunikorn.Factory)
}

// Setup controller for SparkApplication.
Expand Down
33 changes: 33 additions & 0 deletions internal/scheduler/yunikorn/resource_usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package yunikorn

import "github.com/kubeflow/spark-operator/api/v1beta2"

func getInitialExecutors(app *v1beta2.SparkApplication) int32 {
// Take the max of the number of executors and both the initial and minimum number of executors from
// dynamic allocation. See the upstream Spark code below for reference.
// https://github.com/apache/spark/blob/bc187013da821eba0ffff2408991e8ec6d2749fe/core/src/main/scala/org/apache/spark/util/Utils.scala#L2539-L2542
initialExecutors := int32(0)

if app.Spec.Executor.Instances != nil {
initialExecutors = max(initialExecutors, *app.Spec.Executor.Instances)
}

if app.Spec.DynamicAllocation != nil {
if app.Spec.DynamicAllocation.MinExecutors != nil {
initialExecutors = max(initialExecutors, *app.Spec.DynamicAllocation.MinExecutors)
}
if app.Spec.DynamicAllocation.InitialExecutors != nil {
initialExecutors = max(initialExecutors, *app.Spec.DynamicAllocation.InitialExecutors)
}
}

return initialExecutors
}

func driverPodResourceUsage(_ *v1beta2.SparkApplication) (map[string]string, error) {
return nil, nil
}

func executorPodResourceUsage(_ *v1beta2.SparkApplication) (map[string]string, error) {
return nil, nil
}
117 changes: 117 additions & 0 deletions internal/scheduler/yunikorn/resource_usage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package yunikorn

import (
"github.com/kubeflow/spark-operator/pkg/util"
"github.com/stretchr/testify/assert"
"testing"

"github.com/kubeflow/spark-operator/api/v1beta2"
)

func TestGetInitialExecutors(t *testing.T) {
testCases := []struct {
Name string
App *v1beta2.SparkApplication
Expected int32
}{
{
Name: "Nothing specified",
App: &v1beta2.SparkApplication{},
Expected: 0,
},
{
Name: "Only instances",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
Executor: v1beta2.ExecutorSpec{
Instances: util.Int32Ptr(1),
},
},
},
Expected: 1,
},
{
Name: "Only initial",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
DynamicAllocation: &v1beta2.DynamicAllocation{
InitialExecutors: util.Int32Ptr(1),
},
},
},
Expected: 1,
},
{
Name: "Only min",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
DynamicAllocation: &v1beta2.DynamicAllocation{
MinExecutors: util.Int32Ptr(1),
},
},
},
Expected: 1,
},
{
Name: "Instances and initial",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
Executor: v1beta2.ExecutorSpec{
Instances: util.Int32Ptr(2),
},
DynamicAllocation: &v1beta2.DynamicAllocation{
InitialExecutors: util.Int32Ptr(1),
},
},
},
Expected: 2,
},
{
Name: "Instances and min",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
Executor: v1beta2.ExecutorSpec{
Instances: util.Int32Ptr(2),
},
DynamicAllocation: &v1beta2.DynamicAllocation{
MinExecutors: util.Int32Ptr(1),
},
},
},
Expected: 2,
},
{
Name: "Initial and min",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
DynamicAllocation: &v1beta2.DynamicAllocation{
InitialExecutors: util.Int32Ptr(2),
MinExecutors: util.Int32Ptr(1),
},
},
},
Expected: 2,
},
{
Name: "All",
App: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
Executor: v1beta2.ExecutorSpec{
Instances: util.Int32Ptr(3),
},
DynamicAllocation: &v1beta2.DynamicAllocation{
InitialExecutors: util.Int32Ptr(2),
MinExecutors: util.Int32Ptr(1),
},
},
},
Expected: 3,
},
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.Expected, getInitialExecutors(tc.App))
})
}
}
143 changes: 143 additions & 0 deletions internal/scheduler/yunikorn/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package yunikorn

import (
"encoding/json"
"fmt"

v1 "k8s.io/api/core/v1"

"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/scheduler"
)

const (
SchedulerName = "yunikorn"

DriverTaskGroupName = "spark-driver"
ExecutorTaskGroupName = "spark-executor"

TaskGroupNameAnnotation = "yunikorn.apache.org/task-group-name"
TaskGroupsAnnotation = "yunikorn.apache.org/task-groups"

QueueLabel = "queue"
)

// This struct has been defined separately rather than imported so that tags can be included for JSON marshalling
// https://github.com/apache/yunikorn-k8shim/blob/207e4031c6484c965fca4018b6b8176afc5956b4/pkg/cache/amprotocol.go#L47-L56
type taskGroup struct {
Name string `json:"name"`
MinMember int32 `json:"minMember"`
MinResource map[string]string `json:"minResource,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Affinity *v1.Affinity `json:"affinity,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}

type Scheduler struct{}

// Ensure the Yunikorn scheduler implements the required interface
var _ scheduler.Interface = &Scheduler{}

func Factory(_ scheduler.Config) (scheduler.Interface, error) {
return &Scheduler{}, nil
}

func (s *Scheduler) Name() string {
return SchedulerName
}

func (s *Scheduler) ShouldSchedule(_ *v1beta2.SparkApplication) bool {
// Yunikorn gets all the information it needs from pod annotations, so
// there are no additional resources to be created
return true
}

func (s *Scheduler) Schedule(app *v1beta2.SparkApplication) error {
driverMinResources, err := driverPodResourceUsage(app)
if err != nil {
return fmt.Errorf("failed to calculate driver pod resource usage: %w", err)
}

taskGroups := []taskGroup{
{
Name: DriverTaskGroupName,
MinMember: 1,
MinResource: driverMinResources,
NodeSelector: mergeMaps(app.Spec.NodeSelector, app.Spec.Driver.NodeSelector),
Tolerations: app.Spec.Driver.Tolerations,
Affinity: app.Spec.Driver.Affinity,
Labels: app.Spec.Driver.Labels,
},
}

// A minMember of zero is not a valid config for a Yunikorn task group,
// so we should leave out the executor task group completely
// if the initial number of executors is zero
if initialExecutors := getInitialExecutors(app); initialExecutors > 0 {
executorMinResources, err := executorPodResourceUsage(app)
if err != nil {
return fmt.Errorf("failed to calculate executor pod resource usage: %w", err)
}

taskGroups = append(taskGroups, taskGroup{
Name: ExecutorTaskGroupName,
MinMember: initialExecutors,
MinResource: executorMinResources,
NodeSelector: mergeMaps(app.Spec.NodeSelector, app.Spec.Executor.NodeSelector),
Tolerations: app.Spec.Executor.Tolerations,
Affinity: app.Spec.Executor.Affinity,
Labels: app.Spec.Executor.Labels,
})
}

if err := addTaskGroupAnnotations(app, taskGroups); err != nil {
return fmt.Errorf("failed to add task group annotations: %w", err)
}
addQueueLabels(app)

return nil
}

func (s *Scheduler) Cleanup(_ *v1beta2.SparkApplication) error {
// No additional resources are created so there's nothing to be cleaned up
return nil
}

func addTaskGroupAnnotations(app *v1beta2.SparkApplication, taskGroups []taskGroup) error {
marshalledTaskGroups, err := json.Marshal(taskGroups)
if err != nil {
return fmt.Errorf("failed to marshal taskGroups: %w", err)
}

if app.Spec.Driver.Annotations == nil {
app.Spec.Driver.Annotations = make(map[string]string)
}

if app.Spec.Executor.Annotations == nil {
app.Spec.Executor.Annotations = make(map[string]string)
}

app.Spec.Driver.Annotations[TaskGroupNameAnnotation] = DriverTaskGroupName
app.Spec.Executor.Annotations[TaskGroupNameAnnotation] = ExecutorTaskGroupName

// The task group annotation only needs to be present on the originating pod
app.Spec.Driver.Annotations[TaskGroupsAnnotation] = string(marshalledTaskGroups)

return nil
}

func addQueueLabels(app *v1beta2.SparkApplication) {
if app.Spec.BatchSchedulerOptions != nil && app.Spec.BatchSchedulerOptions.Queue != nil {
if app.Spec.Driver.Labels == nil {
app.Spec.Driver.Labels = make(map[string]string)
}

if app.Spec.Executor.Labels == nil {
app.Spec.Executor.Labels = make(map[string]string)
}

app.Spec.Driver.Labels[QueueLabel] = *app.Spec.BatchSchedulerOptions.Queue
app.Spec.Executor.Labels[QueueLabel] = *app.Spec.BatchSchedulerOptions.Queue
}
}
16 changes: 16 additions & 0 deletions internal/scheduler/yunikorn/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package yunikorn

import "maps"

func mergeMaps(m1, m2 map[string]string) map[string]string {
out := make(map[string]string)

maps.Copy(out, m1)
maps.Copy(out, m2)

// Return nil if there are no keys so the struct field is skipped JSON marshalling
if len(out) == 0 {
return nil
}
return out
}
45 changes: 45 additions & 0 deletions internal/scheduler/yunikorn/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package yunikorn

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMergeMaps(t *testing.T) {
testCases := []struct {
m1 map[string]string
m2 map[string]string
expected map[string]string
}{
{
m1: map[string]string{},
m2: map[string]string{},
expected: nil,
},
{
m1: map[string]string{"key1": "value1"},
m2: map[string]string{},
expected: map[string]string{"key1": "value1"},
},
{
m1: map[string]string{},
m2: map[string]string{"key1": "value1"},
expected: map[string]string{"key1": "value1"},
},
{
m1: map[string]string{"key1": "value1"},
m2: map[string]string{"key2": "value2"},
expected: map[string]string{"key1": "value1", "key2": "value2"},
},
{
m1: map[string]string{"key1": "value1"},
m2: map[string]string{"key1": "value2", "key2": "value2"},
expected: map[string]string{"key1": "value2", "key2": "value2"},
},
}

for _, tc := range testCases {
assert.Equal(t, tc.expected, mergeMaps(tc.m1, tc.m2))
}
}

0 comments on commit e7d9cc5

Please sign in to comment.