Skip to content

Commit

Permalink
remove force stop logic (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
goccy authored Aug 31, 2023
1 parent 6cac110 commit 90ef5cb
Showing 1 changed file with 1 addition and 24 deletions.
25 changes: 1 addition & 24 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,23 +422,6 @@ type JobFinalizer struct {
// However, if kubejob itself does not delete Pods, the forced termination process cannot be executed either.
// Therefore, by specifying the finalizer container, you can explicitly terminate the injected container.
func (j *Job) RunWithExecutionHandler(ctx context.Context, handler JobExecutionHandler, finalizer *JobFinalizer) error {
childCtx, cancel := context.WithCancel(ctx)
errCh := make(chan error)
go func() {
errCh <- j.runWithExecutionHandler(childCtx, cancel, handler, finalizer)
}()
select {
case <-ctx.Done():
// stop runWithExecutionHandler safely.
cancel()
return <-errCh
case err := <-errCh:
return err
}
return nil
}

func (j *Job) runWithExecutionHandler(ctx context.Context, cancelFn func(), handler JobExecutionHandler, finalizer *JobFinalizer) error {
executorMap := map[string]*JobExecutor{}
for idx := range j.Job.Spec.Template.Spec.Containers {
executor, err := j.containerToJobExecutor(idx, &j.Job.Spec.Template.Spec.Containers[idx])
Expand All @@ -461,7 +444,6 @@ func (j *Job) runWithExecutionHandler(ctx context.Context, cancelFn func(), hand
var callbackPod *corev1.Pod
j.podRunningCallback = func(pod *corev1.Pod) error {
callbackPod = pod
forceStop := false
executors := []*JobExecutor{}
for _, container := range pod.Spec.Containers {
if executor, exists := executorMap[container.Name]; exists {
Expand All @@ -471,8 +453,7 @@ func (j *Job) runWithExecutionHandler(ctx context.Context, cancelFn func(), hand
executors = append(executors, executor)
} else {
// found injected container.
// Since kubejob cannot handle termination of this container, use forceStop logic
forceStop = true
// Since kubejob cannot handle termination of this container, needs to use finalizer.
}
}
if finalizerExecutor != nil {
Expand All @@ -487,7 +468,6 @@ func (j *Job) runWithExecutionHandler(ctx context.Context, cancelFn func(), hand
}
if err := executor.Stop(); err != nil {
j.logWarn("failed to stop %s", err)
forceStop = true
}
}
if finalizerExecutor != nil {
Expand All @@ -506,9 +486,6 @@ func (j *Job) runWithExecutionHandler(ctx context.Context, cancelFn func(), hand
}
}
}
if forceStop {
cancelFn()
}
}()
if err := handler(ctx, executors); err != nil {
return err
Expand Down

0 comments on commit 90ef5cb

Please sign in to comment.