Skip to content

Commit

Permalink
Fix: spark application does not respect time to live seconds (kubeflo…
Browse files Browse the repository at this point in the history
…w#2165)

* Add time to live seconds example spark application

Signed-off-by: Yi Chen <github@chenyicn.net>

* fix: spark application does not respect time to live seconds

Signed-off-by: Yi Chen <github@chenyicn.net>

---------

Signed-off-by: Yi Chen <github@chenyicn.net>
  • Loading branch information
ChenYi015 authored Sep 19, 2024
1 parent a2f71c6 commit c855ee4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 73 deletions.
37 changes: 37 additions & 0 deletions examples/spark-pi-ttl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi-ttl
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
sparkVersion: 3.5.2
timeToLiveSeconds: 30
driver:
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
instances: 1
cores: 1
memory: 512m
125 changes: 52 additions & 73 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,11 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
if err := r.updateSparkApplicationState(ctx, app); err != nil {
return err
}

if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}

return nil
},
)
Expand Down Expand Up @@ -529,85 +531,62 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
}

func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
old, err := r.getSparkApplication(key)
if err != nil {
return err
}
if old.Status.AppState.State != v1beta2.ApplicationStateCompleted {
return nil
}
app := old.DeepCopy()

if util.IsExpired(app) {
logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
if err := r.client.Delete(ctx, app); err != nil {
return err
}
return nil
}
if err := r.updateExecutorState(ctx, app); err != nil {
return err
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
if err := r.cleanUpOnTermination(old, app); err != nil {
logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State)
return err
}
return nil
},
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
}
return ctrl.Result{}, nil
return r.reconcileTerminatedSparkApplication(ctx, req)
}

func (r *Reconciler) reconcileFailedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return r.reconcileTerminatedSparkApplication(ctx, req)
}

func (r *Reconciler) reconcileTerminatedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
old, err := r.getSparkApplication(key)
if err != nil {
return err
}
if old.Status.AppState.State != v1beta2.ApplicationStateFailed {
return nil
}
app := old.DeepCopy()
old, err := r.getSparkApplication(key)
if err != nil {
return ctrl.Result{Requeue: true}, err
}

if util.IsExpired(app) {
logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
if err := r.client.Delete(ctx, app); err != nil {
return err
}
return nil
}
if err := r.updateExecutorState(ctx, app); err != nil {
return err
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
if err := r.cleanUpOnTermination(old, app); err != nil {
logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State)
return err
}
return nil
},
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
app := old.DeepCopy()
if !util.IsTerminated(app) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil

if util.IsExpired(app) {
logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
if err := r.client.Delete(ctx, app); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{}, nil
}

if err := r.updateExecutorState(ctx, app); err != nil {
return ctrl.Result{Requeue: true}, err
}

if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return ctrl.Result{Requeue: true}, err
}

if err := r.cleanUpOnTermination(old, app); err != nil {
logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State)
return ctrl.Result{Requeue: true}, err
}

// If termination time or TTL is not set, will not requeue this application.
if app.Status.TerminationTime.IsZero() || app.Spec.TimeToLiveSeconds == nil || *app.Spec.TimeToLiveSeconds <= 0 {
return ctrl.Result{}, nil
}

// Otherwise, requeue the application for subsequent deletion.
now := time.Now()
ttl := time.Duration(*app.Spec.TimeToLiveSeconds) * time.Second
survival := now.Sub(app.Status.TerminationTime.Time)

// If survival time is greater than TTL, requeue the application immediately.
if survival >= ttl {
return ctrl.Result{Requeue: true}, nil
}
// Otherwise, requeue the application after (TTL - survival) seconds.
return ctrl.Result{RequeueAfter: ttl - survival}, nil
}

func (r *Reconciler) reconcileUnknownSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/sparkapplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func GetApplicationState(app *v1beta2.SparkApplication) v1beta2.ApplicationState
return app.Status.AppState.State
}

// IsTerminated returns whether the given SparkApplication is terminated.
func IsTerminated(app *v1beta2.SparkApplication) bool {
return app.Status.AppState.State == v1beta2.ApplicationStateCompleted ||
app.Status.AppState.State == v1beta2.ApplicationStateFailed
}

// IsExpired returns whether the given SparkApplication is expired.
func IsExpired(app *v1beta2.SparkApplication) bool {
// The application has no TTL defined and will never expire.
Expand Down

0 comments on commit c855ee4

Please sign in to comment.