Skip to content

Commit

Permalink
Merge pull request #52 from lyft/multi-v1beta2
Browse files Browse the repository at this point in the history
merge multi-v1beta2 into master
  • Loading branch information
michaelawilkins authored Apr 12, 2021
2 parents 8e7212b + f7641ce commit d222980
Show file tree
Hide file tree
Showing 10 changed files with 488 additions and 59 deletions.
17 changes: 16 additions & 1 deletion pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@ limitations under the License.

package v1beta2

import (
"strings"
)

// SetSparkApplicationDefaults sets default values for certain fields of a SparkApplication.
func SetSparkApplicationDefaults(app *SparkApplication) {
if app == nil {
return
}

var deployMode DeployMode
for key, value := range app.Spec.SparkConf {
if strings.HasPrefix(key, "spark.submit.deployMode") {
deployMode = DeployMode(value)
}
}

if app.Spec.Mode == "" {
app.Spec.Mode = ClusterMode
if deployMode != "" {
app.Spec.Mode = deployMode
} else {
app.Spec.Mode = ClientMode
}
}

if app.Spec.RestartPolicy.Type == "" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func TestSetSparkApplicationDefaultsNilSparkApplicationShouldNotModifySparkAppli
assert.Nil(t, app)
}

func TestSetSparkApplicationDefaultsEmptyModeShouldDefaultToClusterMode(t *testing.T) {
func TestSetSparkApplicationDefaultsEmptyModeShouldDefaultToClientMode(t *testing.T) {
app := &SparkApplication{
Spec: SparkApplicationSpec{},
}

SetSparkApplicationDefaults(app)

assert.Equal(t, ClusterMode, app.Spec.Mode)
assert.Equal(t, ClientMode, app.Spec.Mode)
}

func TestSetSparkApplicationDefaultsModeShouldNotChangeIfSet(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ const (
SparkExecutorJavaOptions = "spark.executor.extraJavaOptions"
// SparkExecutorDeleteOnTermination is the Spark configuration for specifying whether executor pods should be deleted in case of failure or normal termination
SparkExecutorDeleteOnTermination = "spark.kubernetes.executor.deleteOnTermination"
//SparkDriverHost is the Spark configuration used for communicating with the executors and standalone Master.
SparkDriverHost = "spark.driver.host"
)

const (
Expand Down
150 changes: 98 additions & 52 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sparkapplication
import (
"fmt"
"os/exec"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -64,17 +65,18 @@ var (

// Controller manages instances of SparkApplication.
type Controller struct {
crdClient crdclientset.Interface
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
cacheSynced cache.InformerSynced
recorder record.EventRecorder
metrics *sparkAppMetrics
applicationLister crdlisters.SparkApplicationLister
podLister v1.PodLister
ingressURLFormat string
batchSchedulerMgr *batchscheduler.SchedulerManager
subJobManager submissionJobManager
crdClient crdclientset.Interface
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
cacheSynced cache.InformerSynced
recorder record.EventRecorder
metrics *sparkAppMetrics
applicationLister crdlisters.SparkApplicationLister
podLister v1.PodLister
ingressURLFormat string
batchSchedulerMgr *batchscheduler.SchedulerManager
subJobManager submissionJobManager
clientModeSubPodManager clientModeSubmissionPodManager
}

// NewController creates a new Controller.
Expand Down Expand Up @@ -112,13 +114,14 @@ func newSparkApplicationController(
"spark-application-controller")

controller := &Controller{
crdClient: crdClient,
kubeClient: kubeClient,
recorder: eventRecorder,
queue: queue,
ingressURLFormat: ingressURLFormat,
batchSchedulerMgr: batchSchedulerMgr,
subJobManager: &realSubmissionJobManager{kubeClient: kubeClient},
crdClient: crdClient,
kubeClient: kubeClient,
recorder: eventRecorder,
queue: queue,
ingressURLFormat: ingressURLFormat,
batchSchedulerMgr: batchSchedulerMgr,
subJobManager: &realSubmissionJobManager{kubeClient: kubeClient},
clientModeSubPodManager: &realClientModeSubmissionPodManager{kubeClient: kubeClient},
}

if metricsConfig != nil {
Expand Down Expand Up @@ -150,6 +153,7 @@ func newSparkApplicationController(
DeleteFunc: sparkObjectEventHandler.onObjectDeleted,
})
controller.subJobManager = &realSubmissionJobManager{kubeClient: kubeClient, jobLister: jobInformer.Lister()}
controller.clientModeSubPodManager = &realClientModeSubmissionPodManager{kubeClient: kubeClient, podLister: podsInformer.Lister()}

controller.cacheSynced = func() bool {
return crdInformer.Informer().HasSynced() && podsInformer.Informer().HasSynced()
Expand Down Expand Up @@ -349,8 +353,8 @@ func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) erro
}
}
}

newState := driverStateToApplicationState(driverState)

// Only record a driver event if the application state (derived from the driver pod phase) has changed.
if newState != app.Status.AppState.State {
c.recordDriverEvent(app, driverState, driverPod.Name)
Expand Down Expand Up @@ -449,10 +453,20 @@ func shouldRetry(app *v1beta2.SparkApplication) bool {
return true
}
}
case v1beta2.PendingSubmissionState:
var interval int64 = 257
if app.Spec.Mode == v1beta2.ClientMode && hasRetryIntervalPassed(&interval, app.Status.SubmissionAttempts, app.CreationTimestamp) && app.Status.SubmissionAttempts < 14 {
return true
}
case v1beta2.FailedSubmissionState:
// We retry only if the RestartPolicy is Always. The Submission Job already retries upto the OnSubmissionFailureRetries specified.
if app.Spec.RestartPolicy.Type == v1beta2.Always {
return true
} else if app.Spec.RestartPolicy.Type == v1beta2.OnFailure {
if app.Spec.Mode == v1beta2.ClientMode && strings.Contains(app.Status.AppState.ErrorMessage, "exceeded quota") && app.Status.SubmissionAttempts < 14 {

return true
}
}
}
return false
Expand Down Expand Up @@ -526,33 +540,43 @@ func (c *Controller) syncSparkApplication(key string) error {
appToUpdate = c.submitSparkApplication(appToUpdate)
}
case v1beta2.PendingSubmissionState:
// Check the status of the submission Job and set the application status accordingly.
succeeded, completionTime, err := c.subJobManager.hasJobSucceeded(appToUpdate)

if succeeded != nil {
// Submission Job terminated in either success or failure.
if *succeeded {
c.createSparkUIResources(appToUpdate)
appToUpdate.Status.AppState.State = v1beta2.SubmittedState
appToUpdate.Status.ExecutionAttempts++
if completionTime != nil {
appToUpdate.Status.SubmissionTime = *completionTime
}
c.recordSparkApplicationEvent(appToUpdate)
} else {
// Since we delegate submission retries to the Kubernetes Job controller, the fact that the
// submission Job failed means all the submission attempts failed. So we set the application
// state to FailedSubmission, which is a terminal state.
appToUpdate.Status.AppState.State = v1beta2.FailedSubmissionState
if err != nil {
// Propagate the error if the submission Job ended in failure after retries.
appToUpdate.Status.AppState.ErrorMessage = err.Error()
//Resubmission is based on resource quota. We wait and then see if the interval passed to rerun
if app.Spec.Mode == v1beta2.ClientMode {
if shouldRetry(appToUpdate) {
appToUpdate.Status.AppState.ErrorMessage = ""
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
}
} else {
// Check the status of the submission Job and set the application status accordingly.
succeeded, completionTime, err := c.subJobManager.hasJobSucceeded(appToUpdate)

if succeeded != nil {
// Submission Job terminated in either success or failure.
if *succeeded {
c.createSparkUIResources(appToUpdate)
appToUpdate.Status.AppState.State = v1beta2.SubmittedState
appToUpdate.Status.ExecutionAttempts++
if completionTime != nil {
appToUpdate.Status.SubmissionTime = *completionTime
}
c.recordSparkApplicationEvent(appToUpdate)
} else {
// Since we delegate submission retries to the Kubernetes Job controller, the fact that the
// submission Job failed means all the submission attempts failed. So we set the application
// state to FailedSubmission, which is a terminal state.
appToUpdate.Status.AppState.State = v1beta2.FailedSubmissionState
if err != nil {
// Propagate the error if the submission Job ended in failure after retries.
appToUpdate.Status.AppState.ErrorMessage = err.Error()
}
c.recordSparkApplicationEvent(appToUpdate)
}
c.recordSparkApplicationEvent(appToUpdate)
} else if err != nil {
// Received an error trying to query the status of the Job.
return err
}
} else if err != nil {
// Received an error trying to query the status of the Job.
return err

}
case v1beta2.SucceedingState:
// The current run of the application has completed, check if it needs to be restarted.
Expand Down Expand Up @@ -588,9 +612,14 @@ func (c *Controller) syncSparkApplication(key string) error {
appToUpdate.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appToUpdate)
} else {
// Application is subject to retry. Move to PendingRerunState.
appToUpdate.Status.AppState.ErrorMessage = ""
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
if appToUpdate.Spec.Mode == v1beta2.ClusterMode {
// Application is subject to retry. Move to PendingRerunState.
appToUpdate.Status.AppState.ErrorMessage = ""
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
} else {
//need to wait before resubmitting if client failure due to resource quota
appToUpdate.Status.AppState.State = v1beta2.PendingSubmissionState
}
}
case v1beta2.InvalidatingState:
// Invalidate the current run and enqueue the SparkApplication for re-submission.
Expand Down Expand Up @@ -672,9 +701,18 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
}
}

submissionID, driverPodName, err := c.subJobManager.createSubmissionJob(app)
var submissionID string
var driverPodName string
var err error

if app.Spec.Mode == v1beta2.ClientMode {
submissionID, driverPodName, err = c.clientModeSubPodManager.createClientDriverPod(app)
} else {
submissionID, driverPodName, err = c.subJobManager.createSubmissionJob(app)
}

if err != nil {
if !errors.IsAlreadyExists(err) {
if !errors.IsAlreadyExists(err) || app.Spec.Mode == v1beta2.ClientMode {
app.Status = v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.FailedSubmissionState,
Expand All @@ -683,20 +721,30 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
}
}

c.recordSparkApplicationEvent(app)
return app
}

glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
var appState v1beta2.ApplicationStateType
if app.Spec.Mode == v1beta2.ClientMode {
appState = v1beta2.SubmittedState
} else {
appState = v1beta2.PendingSubmissionState
}
app.Status = v1beta2.SparkApplicationStatus{
SubmissionID: submissionID,
DriverInfo: v1beta2.DriverInfo{PodName: driverPodName},
AppState: v1beta2.ApplicationState{State: v1beta2.PendingSubmissionState},
AppState: v1beta2.ApplicationState{State: appState},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
}

c.recordSparkApplicationEvent(app)
if app.Spec.Mode == v1beta2.ClientMode {
c.createSparkUIResources(app)
}

return app
}
Expand Down Expand Up @@ -1004,15 +1052,13 @@ func (c *Controller) recordExecutorEvent(app *v1beta2.SparkApplication, state v1
func (c *Controller) clearStatus(status *v1beta2.SparkApplicationStatus) {
if status.AppState.State == v1beta2.InvalidatingState {
status.SparkApplicationID = ""
status.SubmissionAttempts = 0
status.ExecutionAttempts = 0
status.SubmissionTime = metav1.Time{}
status.TerminationTime = metav1.Time{}
status.AppState.ErrorMessage = ""
status.ExecutorState = nil
} else if status.AppState.State == v1beta2.PendingRerunState {
status.SparkApplicationID = ""
status.SubmissionAttempts = 0
status.DriverInfo = v1beta2.DriverInfo{}
status.AppState.ErrorMessage = ""
status.ExecutorState = nil
Expand Down
Loading

0 comments on commit d222980

Please sign in to comment.