Skip to content

Commit

Permalink
Revert "Fixed the spec update issue in kubeflow#795 (kubeflow#804)" (k…
Browse files Browse the repository at this point in the history
…ubeflow#805)

This reverts commit 1687c06.
  • Loading branch information
liyinan926 authored and breetasinha1109 committed Mar 31, 2020
1 parent ed0310d commit ad90828
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/batchscheduler/interface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ type BatchScheduler interface {
Name() string

ShouldSchedule(app *v1beta2.SparkApplication) bool
DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error
DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error)
}
46 changes: 24 additions & 22 deletions pkg/batchscheduler/volcano/volcano_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package volcano

import (
"fmt"

corev1 "k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,7 +29,7 @@ import (
volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
schedulerinterface "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface"
)

const (
Expand All @@ -55,36 +54,39 @@ func (v *VolcanoBatchScheduler) ShouldSchedule(app *v1beta2.SparkApplication) bo
return true
}

func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error {
if app.Spec.Executor.Annotations == nil {
app.Spec.Executor.Annotations = make(map[string]string)
func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) {
newApp := app.DeepCopy()
if newApp.Spec.Executor.Annotations == nil {
newApp.Spec.Executor.Annotations = make(map[string]string)
}

if app.Spec.Driver.Annotations == nil {
app.Spec.Driver.Annotations = make(map[string]string)
if newApp.Spec.Driver.Annotations == nil {
newApp.Spec.Driver.Annotations = make(map[string]string)
}

if app.Spec.Mode == v1beta2.ClientMode {
return v.syncPodGroupInClientMode(app)
} else if app.Spec.Mode == v1beta2.ClusterMode {
return v.syncPodGroupInClusterMode(app)
if newApp.Spec.Mode == v1beta2.ClientMode {
return v.syncPodGroupInClientMode(newApp)
} else if newApp.Spec.Mode == v1beta2.ClusterMode {
return v.syncPodGroupInClusterMode(newApp)
}
return nil
return newApp, nil
}

func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) error {
// We only care about the executor pods in client mode
if _, ok := app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok {
if err := v.syncPodGroup(app, 1, getExecutorRequestResource(app)); err == nil {
app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app)
func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) {
//We only care about the executor pods in client mode
newApp := app.DeepCopy()
if _, ok := newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok {
//Only executor resource will be considered.
if err := v.syncPodGroup(newApp, 1, getExecutorRequestResource(app)); err == nil {
newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(newApp)
} else {
return err
return nil, err
}
}
return nil
return newApp, nil
}

func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) error {
func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) {
//We need both mark Driver and Executor when submitting
//NOTE: In cluster mode, the initial size of PodGroup is set to 1 in order to schedule driver pod first.
if _, ok := app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok {
Expand All @@ -94,10 +96,10 @@ func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkAppl
app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app)
app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app)
} else {
return err
return nil, err
}
}
return nil
return app, nil
}

func (v *VolcanoBatchScheduler) getAppPodGroupName(app *v1beta2.SparkApplication) string {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
v1beta2.SetSparkApplicationDefaults(app)

if app.PrometheusMonitoringEnabled() {
if err := configPrometheusMonitoring(appCopy, c.kubeClient); err != nil {
if err := configPrometheusMonitoring(app, c.kubeClient); err != nil {
glog.Error(err)
}
}
Expand All @@ -635,6 +635,8 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
glog.Errorf("failed to process batch scheduler BeforeSubmitSparkApplication with error %v", err)
return app
}
//Spark submit will use the updated app to submit tasks(Spec will not be updated into API server)
app = newApp
}

driverPodName := getDriverPodName(app)
Expand Down

0 comments on commit ad90828

Please sign in to comment.