From ad90828e28b7c2a4975e6e22e5a6518a69c159ad Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Tue, 11 Feb 2020 20:31:50 -0800 Subject: [PATCH] Revert "Fixed the spec update issue in #795 (#804)" (#805) This reverts commit 1687c0647c11a5c64d6eeb808522fae75051ad6a. --- pkg/batchscheduler/interface/interface.go | 2 +- .../volcano/volcano_scheduler.go | 46 ++++++++++--------- pkg/controller/sparkapplication/controller.go | 4 +- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/batchscheduler/interface/interface.go b/pkg/batchscheduler/interface/interface.go index 06f09c8c2d..0d0d3fd9be 100755 --- a/pkg/batchscheduler/interface/interface.go +++ b/pkg/batchscheduler/interface/interface.go @@ -24,5 +24,5 @@ type BatchScheduler interface { Name() string ShouldSchedule(app *v1beta2.SparkApplication) bool - DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error + DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) } diff --git a/pkg/batchscheduler/volcano/volcano_scheduler.go b/pkg/batchscheduler/volcano/volcano_scheduler.go index e8c9023ffe..88ecc020a3 100755 --- a/pkg/batchscheduler/volcano/volcano_scheduler.go +++ b/pkg/batchscheduler/volcano/volcano_scheduler.go @@ -18,7 +18,6 @@ package volcano import ( "fmt" - corev1 "k8s.io/api/core/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" @@ -30,7 +29,7 @@ import ( volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned" "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" - schedulerinterface "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface" + "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface" ) const ( @@ -55,36 +54,39 @@ func (v *VolcanoBatchScheduler) ShouldSchedule(app *v1beta2.SparkApplication) bo return true } -func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error { - if app.Spec.Executor.Annotations == nil { - app.Spec.Executor.Annotations = make(map[string]string) +func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { + newApp := app.DeepCopy() + if newApp.Spec.Executor.Annotations == nil { + newApp.Spec.Executor.Annotations = make(map[string]string) } - if app.Spec.Driver.Annotations == nil { - app.Spec.Driver.Annotations = make(map[string]string) + if newApp.Spec.Driver.Annotations == nil { + newApp.Spec.Driver.Annotations = make(map[string]string) } - if app.Spec.Mode == v1beta2.ClientMode { - return v.syncPodGroupInClientMode(app) - } else if app.Spec.Mode == v1beta2.ClusterMode { - return v.syncPodGroupInClusterMode(app) + if newApp.Spec.Mode == v1beta2.ClientMode { + return v.syncPodGroupInClientMode(newApp) + } else if newApp.Spec.Mode == v1beta2.ClusterMode { + return v.syncPodGroupInClusterMode(newApp) } - return nil + return newApp, nil } -func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) error { - // We only care about the executor pods in client mode - if _, ok := app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { - if err := v.syncPodGroup(app, 1, getExecutorRequestResource(app)); err == nil { - app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) +func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { + //We only care about the executor pods in client mode + newApp := app.DeepCopy() + if _, ok := newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { + //Only executor resource will be considered. + if err := v.syncPodGroup(newApp, 1, getExecutorRequestResource(app)); err == nil { + newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(newApp) } else { - return err + return nil, err } } - return nil + return newApp, nil } -func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) error { +func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { //We need both mark Driver and Executor when submitting //NOTE: In cluster mode, the initial size of PodGroup is set to 1 in order to schedule driver pod first. if _, ok := app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { @@ -94,10 +96,10 @@ func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkAppl app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) } else { - return err + return nil, err } } - return nil + return app, nil } func (v *VolcanoBatchScheduler) getAppPodGroupName(app *v1beta2.SparkApplication) string { diff --git a/pkg/controller/sparkapplication/controller.go b/pkg/controller/sparkapplication/controller.go index 3b45e21a31..bd3a763b82 100755 --- a/pkg/controller/sparkapplication/controller.go +++ b/pkg/controller/sparkapplication/controller.go @@ -623,7 +623,7 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be v1beta2.SetSparkApplicationDefaults(app) if app.PrometheusMonitoringEnabled() { - if err := configPrometheusMonitoring(appCopy, c.kubeClient); err != nil { + if err := configPrometheusMonitoring(app, c.kubeClient); err != nil { glog.Error(err) } } @@ -635,6 +635,8 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be glog.Errorf("failed to process batch scheduler BeforeSubmitSparkApplication with error %v", err) return app } + //Spark submit will use the updated app to submit tasks(Spec will not be updated into API server) + app = newApp } driverPodName := getDriverPodName(app)