Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 552f145

Browse files
authored
Transition to Queue if the JobCondition is empty (#387)
Signed-off-by: Kevin Su <pingsutw@apache.org>
1 parent 13465ae commit 552f145

File tree

11 files changed

+240
-48
lines changed

11 files changed

+240
-48
lines changed

go/tasks/plugins/k8s/kfoperators/common/common_operator.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,6 @@ type ReplicaEntry struct {
3131
RestartPolicy commonOp.RestartPolicy
3232
}
3333

34-
// ExtractMPICurrentCondition will return the first job condition for MPI
35-
func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
36-
if jobConditions != nil {
37-
sort.Slice(jobConditions, func(i, j int) bool {
38-
return jobConditions[i].LastTransitionTime.Time.After(jobConditions[j].LastTransitionTime.Time)
39-
})
40-
41-
for _, jc := range jobConditions {
42-
if jc.Status == v1.ConditionTrue {
43-
return jc, nil
44-
}
45-
}
46-
}
47-
48-
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
49-
}
50-
5134
// ExtractCurrentCondition will return the first job condition for tensorflow/pytorch
5235
func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
5336
if jobConditions != nil {
@@ -60,14 +43,17 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo
6043
return jc, nil
6144
}
6245
}
46+
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
6347
}
64-
65-
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
48+
return commonOp.JobCondition{}, nil
6649
}
6750

6851
// GetPhaseInfo will return the phase of kubeflow job
6952
func GetPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Time,
7053
taskPhaseInfo pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
54+
if len(currentCondition.Type) == 0 {
55+
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
56+
}
7157
switch currentCondition.Type {
7258
case commonOp.JobCreated:
7359
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil

go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1919
)
2020

21-
func TestExtractMPICurrentCondition(t *testing.T) {
21+
func TestExtractCurrentCondition(t *testing.T) {
2222
jobCreated := commonOp.JobCondition{
2323
Type: commonOp.JobCreated,
2424
Status: corev1.ConditionTrue,
@@ -31,46 +31,39 @@ func TestExtractMPICurrentCondition(t *testing.T) {
3131
jobCreated,
3232
jobRunningActive,
3333
}
34-
currentCondition, err := ExtractMPICurrentCondition(jobConditions)
34+
currentCondition, err := ExtractCurrentCondition(jobConditions)
3535
assert.NoError(t, err)
3636
assert.Equal(t, currentCondition, jobCreated)
3737

3838
jobConditions = nil
39-
currentCondition, err = ExtractMPICurrentCondition(jobConditions)
40-
assert.Error(t, err)
39+
currentCondition, err = ExtractCurrentCondition(jobConditions)
40+
assert.NoError(t, err)
4141
assert.Equal(t, currentCondition, commonOp.JobCondition{})
42-
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
43-
}
4442

45-
func TestExtractCurrentCondition(t *testing.T) {
46-
jobCreated := commonOp.JobCondition{
47-
Type: commonOp.JobCreated,
48-
Status: corev1.ConditionTrue,
49-
}
50-
jobRunningActive := commonOp.JobCondition{
51-
Type: commonOp.JobRunning,
52-
Status: corev1.ConditionFalse,
53-
}
54-
jobConditions := []commonOp.JobCondition{
55-
jobCreated,
56-
jobRunningActive,
57-
}
58-
currentCondition, err := ExtractCurrentCondition(jobConditions)
43+
currentCondition, err = ExtractCurrentCondition(nil)
5944
assert.NoError(t, err)
60-
assert.Equal(t, currentCondition, jobCreated)
45+
assert.Equal(t, currentCondition, commonOp.JobCondition{})
6146

62-
jobConditions = nil
47+
jobUnknown := commonOp.JobCondition{Type: "unknown"}
48+
jobConditions = []commonOp.JobCondition{jobUnknown}
6349
currentCondition, err = ExtractCurrentCondition(jobConditions)
6450
assert.Error(t, err)
6551
assert.Equal(t, currentCondition, commonOp.JobCondition{})
6652
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
6753
}
6854

6955
func TestGetPhaseInfo(t *testing.T) {
56+
jobCreating := commonOp.JobCondition{}
57+
taskPhase, err := GetPhaseInfo(jobCreating, time.Now(), pluginsCore.TaskInfo{})
58+
assert.NoError(t, err)
59+
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
60+
assert.NotNil(t, taskPhase.Info())
61+
assert.Nil(t, err)
62+
7063
jobCreated := commonOp.JobCondition{
7164
Type: commonOp.JobCreated,
7265
}
73-
taskPhase, err := GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
66+
taskPhase, err = GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
7467
assert.NoError(t, err)
7568
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
7669
assert.NotNil(t, taskPhase.Info())
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package common
2+
3+
import (
4+
"time"
5+
6+
pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
7+
"github.com/flyteorg/flytestdlib/config"
8+
)
9+
10+
//go:generate pflags Config --default-var=defaultConfig
11+
12+
var (
13+
defaultConfig = Config{
14+
Timeout: config.Duration{Duration: 1 * time.Minute},
15+
}
16+
17+
configSection = pluginsConfig.MustRegisterSubSection("kf-operator", &defaultConfig)
18+
)
19+
20+
// Config is config for 'pytorch' plugin
21+
type Config struct {
22+
// If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed.
23+
Timeout config.Duration `json:"timeout,omitempty"`
24+
}
25+
26+
func GetConfig() *Config {
27+
return configSection.GetConfig().(*Config)
28+
}
29+
30+
func SetConfig(cfg *Config) error {
31+
return configSection.SetConfig(cfg)
32+
}

go/tasks/plugins/k8s/kfoperators/common/config_flags.go

Lines changed: 55 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/tasks/plugins/k8s/kfoperators/common/config_flags_test.go

Lines changed: 116 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/tasks/plugins/k8s/kfoperators/mpi/mpi.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,10 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
209209
if err != nil {
210210
return pluginsCore.PhaseInfoUndefined, err
211211
}
212-
currentCondition, err := common.ExtractMPICurrentCondition(app.Status.Conditions)
212+
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
213+
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi custom resource since creation time %v", app.CreationTimestamp)
214+
}
215+
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
213216
if err != nil {
214217
return pluginsCore.PhaseInfoUndefined, err
215218
}
@@ -223,7 +226,6 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
223226
}
224227

225228
return common.GetMPIPhaseInfo(currentCondition, occurredAt, taskPhaseInfo)
226-
227229
}
228230

229231
func init() {

go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func dummyMPIJobResource(mpiResourceHandler mpiOperatorResourceHandler,
281281
Status: mpiOp.JobStatus{
282282
Conditions: jobConditions,
283283
ReplicaStatuses: nil,
284-
StartTime: nil,
284+
StartTime: &v1.Time{Time: time.Now()},
285285
CompletionTime: nil,
286286
LastReconcileTime: nil,
287287
},

go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont
231231
return pluginsCore.PhaseInfoUndefined, err
232232
}
233233

234+
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
235+
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp)
236+
}
234237
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
235238
if err != nil {
236239
return pluginsCore.PhaseInfoUndefined, err

go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,9 @@ func dummyPytorchJobResource(pytorchResourceHandler pytorchOperatorResourceHandl
293293

294294
return &kubeflowv1.PyTorchJob{
295295
ObjectMeta: v1.ObjectMeta{
296-
Name: jobName,
297-
Namespace: jobNamespace,
296+
CreationTimestamp: v1.Time{Time: time.Now()},
297+
Name: jobName,
298+
Namespace: jobNamespace,
298299
},
299300
Spec: resource.(*kubeflowv1.PyTorchJob).Spec,
300301
Status: commonOp.JobStatus{

go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC
209209
return pluginsCore.PhaseInfoUndefined, err
210210
}
211211

212+
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
213+
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow custom resource since creation time %v", app.CreationTimestamp)
214+
}
215+
212216
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
213217
if err != nil {
214218
return pluginsCore.PhaseInfoUndefined, err

0 commit comments

Comments
 (0)