Skip to content

Commit

Permalink
fix(backend): Fix missing parameters in jobs and recurring runs. Closes
Browse files Browse the repository at this point in the history
#9012. (#9118)

* Add pipeline_id to run and recurring run protos

* Add pipeline_id to run and recurring_run

* Fix params in recurring run

* Add support for v1 and v2 parameters in runs and jobs

* Fix convertion issues

* Update a unit-test

* Revert change of the order in sql

* Address comments
  • Loading branch information
gkcalat authored Apr 10, 2023
1 parent e907b63 commit ce096dd
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 80 deletions.
6 changes: 2 additions & 4 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,11 +1143,9 @@ func (r *ResourceManager) ReportWorkflowResource(ctx context.Context, execSpec u
namespace = execSpec.ExecutionNamespace()
}
// Scheduled time equals created time if it is not specified
var scheduledTimeInSec int64
if execSpec.ScheduledAtInSecOr0() == 0 {
scheduledTimeInSec := execSpec.ScheduledAtInSecOr0()
if scheduledTimeInSec == 0 {
scheduledTimeInSec = objMeta.CreationTimestamp.Unix()
} else {
scheduledTimeInSec = execSpec.ScheduledAtInSecOr0()
}
run = &model.Run{
UUID: runId,
Expand Down
125 changes: 124 additions & 1 deletion backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3072,7 +3072,6 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) {
},
PipelineSpec: model.PipelineSpec{
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
Parameters: "[]",
PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest,
PipelineName: actualJob.PipelineSpec.PipelineName,
},
Expand All @@ -3083,6 +3082,130 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) {
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
}

func TestReportScheduledWorkflowResource_Success_withParamsV1(t *testing.T) {
store, manager, job := initWithJob(t)
defer store.Close()
// report scheduled workflow
swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{
TypeMeta: v1.TypeMeta{
APIVersion: "kubeflow.org/v1beta1",
Kind: "ScheduledWorkflow",
},
ObjectMeta: v1.ObjectMeta{
Name: "MY_NAME",
Namespace: "MY_NAMESPACE",
UID: types.UID(job.UUID),
},
Spec: swfapi.ScheduledWorkflowSpec{
Workflow: &swfapi.WorkflowResource{
Parameters: []swfapi.Parameter{
{
Name: "param_v1",
Value: "value_v1",
},
},
},
},
})
err := manager.ReportScheduledWorkflowResource(swf)
assert.Nil(t, err)

actualJob, err := manager.GetJob(job.UUID)
assert.Nil(t, err)

expectedJob := &model.Job{
K8SName: "MY_NAME",
DisplayName: "j1",
Namespace: actualJob.Namespace,
ExperimentId: actualJob.ExperimentId,
ServiceAccount: "pipeline-runner",
Enabled: false,
UUID: actualJob.UUID,
Conditions: "STATUS_UNSPECIFIED",
Trigger: model.Trigger{
CronSchedule: model.CronSchedule{
Cron: util.StringPointer(""),
},
PeriodicSchedule: model.PeriodicSchedule{
IntervalSecond: util.Int64Pointer(0),
},
},
PipelineSpec: model.PipelineSpec{
Parameters: `[{"name":"param_v1","value":"value_v1"}]`,
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest,
PipelineName: actualJob.PipelineSpec.PipelineName,
},
CreatedAtInSec: 2,
UpdatedAtInSec: 3,
}
expectedJob.Conditions = "STATUS_UNSPECIFIED"
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
}

func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.T) {
store, manager, job := initWithJobV2(t)
defer store.Close()
// report scheduled workflow
swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{
TypeMeta: v1.TypeMeta{
APIVersion: "kubeflow.org/v2beta1",
Kind: "ScheduledWorkflow",
},
ObjectMeta: v1.ObjectMeta{
Name: "updated_name",
Namespace: "ns1",
UID: types.UID(job.UUID),
},
Spec: swfapi.ScheduledWorkflowSpec{
Workflow: &swfapi.WorkflowResource{
Parameters: []swfapi.Parameter{
{
Name: "param1",
Value: "\"world-updated\"",
},
},
},
},
})
err := manager.ReportScheduledWorkflowResource(swf)
assert.Nil(t, err)

actualJob, err := manager.GetJob(job.UUID)
assert.Nil(t, err)

expectedJob := &model.Job{
K8SName: "updated_name",
DisplayName: "j1",
Namespace: "ns1",
ExperimentId: job.ExperimentId,
ServiceAccount: "pipeline-runner",
Enabled: false,
UUID: actualJob.UUID,
Conditions: "STATUS_UNSPECIFIED",
Trigger: model.Trigger{
CronSchedule: model.CronSchedule{
Cron: util.StringPointer(""),
},
PeriodicSchedule: model.PeriodicSchedule{
IntervalSecond: util.Int64Pointer(0),
},
},
PipelineSpec: model.PipelineSpec{
PipelineSpecManifest: v2SpecHelloWorld,
PipelineName: actualJob.PipelineSpec.PipelineName,
RuntimeConfig: model.RuntimeConfig{
Parameters: `{"param1":"world-updated"}`,
PipelineRoot: "job-1-root",
},
},
CreatedAtInSec: 2,
UpdatedAtInSec: 3,
}
expectedJob.Conditions = "STATUS_UNSPECIFIED"
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
}

func TestReportScheduledWorkflowResource_Error(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
Expand Down
89 changes: 43 additions & 46 deletions backend/src/apiserver/server/api_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,19 @@ func toApiParametersV1(p string) []*apiv1beta1.Parameter {
// Supports v1beta1 and v2beta1 API.
// Note: returns nil if a parsing error occurs.
func toMapProtoStructParameters(p string) map[string]*structpb.Value {
protoParams := make(map[string]*structpb.Value)
protoParams := make(map[string]*structpb.Value, 0)
if p == "" || p == "null" || p == "[]" {
return protoParams
}
err := json.Unmarshal([]byte(p), &protoParams)
if err != nil {
return nil
if paramsV1 := toApiParametersV1(p); paramsV1 == nil {
return nil
} else {
for _, paramV1 := range paramsV1 {
protoParams[paramV1.Name] = structpb.NewStringValue(paramV1.Value)
}
}
}
return protoParams
}
Expand Down Expand Up @@ -1317,22 +1323,20 @@ func toApiRunV1(r *model.Run) *apiv1beta1.Run {
Error: util.Wrap(errors.New("Failed to parse pipeline spec parameters"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(),
}
}
var runtimeConfig *apiv1beta1.PipelineSpec_RuntimeConfig
if len(specParams) == 0 {
specParams = nil
}

// v2 RuntimeConfig
runtimeConfig := toApiRuntimeConfigV1(r.PipelineSpec.RuntimeConfig)
if runtimeConfig == nil {
return &apiv1beta1.Run{
Id: r.UUID,
Error: util.Wrap(errors.New("Failed to parse runtime config"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(),
runtimeConfig = toApiRuntimeConfigV1(r.PipelineSpec.RuntimeConfig)
if runtimeConfig == nil {
return &apiv1beta1.Run{
Id: r.UUID,
Error: util.Wrap(errors.New("Failed to parse runtime config"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(),
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
}

var metrics []*apiv1beta1.RunMetric
if r.Metrics != nil {
metrics = toApiRunMetricsV1(r.Metrics)
Expand Down Expand Up @@ -1448,7 +1452,11 @@ func toApiRun(r *model.Run) *apiv2beta1.Run {
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
if params := toMapProtoStructParameters(r.PipelineSpec.Parameters); len(params) > 0 {
runtimeConfig.Parameters = params
} else {
runtimeConfig = nil
}
}
apiRd := &apiv2beta1.RunDetails{
PipelineContextId: r.RunDetails.PipelineContextId,
Expand Down Expand Up @@ -1841,11 +1849,11 @@ func toModelJob(j interface{}) (*model.Job, error) {
createTime = apiJob.GetCreatedAt().GetSeconds()
updateTime = apiJob.GetUpdatedAt().GetSeconds()

params, err := toModelParameters(apiJob.GetPipelineSpec().GetParameters())
if err != nil {
if params, err := toModelParameters(apiJob.GetPipelineSpec().GetParameters()); err != nil {
return nil, util.Wrap(err, "Failed to convert v1beta1 API recurring run to its internal representation due to parameters parsing error")
} else {
specParams = params
}
specParams = params

cfg, err := toModelRuntimeConfig(apiJob.GetPipelineSpec().GetRuntimeConfig())
if err != nil {
Expand Down Expand Up @@ -2041,21 +2049,22 @@ func toApiJobV1(j *model.Job) *apiv1beta1.Job {
if specParams == nil {
return &apiv1beta1.Job{
Id: j.UUID,
Error: util.NewInternalServerError(util.NewInvalidInputError("Pipeline spec parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(),
Error: util.NewInternalServerError(util.NewInvalidInputError("Pipeline v1 parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(),
}
}
var runtimeConfig *apiv1beta1.PipelineSpec_RuntimeConfig
if len(specParams) == 0 {
specParams = nil
}
runtimeConfig := toApiRuntimeConfigV1(j.PipelineSpec.RuntimeConfig)
if runtimeConfig == nil {
return &apiv1beta1.Job{
Id: j.UUID,
Error: util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(),
runtimeConfig = toApiRuntimeConfigV1(j.PipelineSpec.RuntimeConfig)
if runtimeConfig == nil {
return &apiv1beta1.Job{
Id: j.UUID,
Error: util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(),
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
}
resRefs := toApiResourceReferencesV1(j.ResourceReferences)
if resRefs == nil {
Expand Down Expand Up @@ -2149,31 +2158,19 @@ func toApiJobV1(j *model.Job) *apiv1beta1.Job {
// Supports v2beta1 API.
func toApiRecurringRun(j *model.Job) *apiv2beta1.RecurringRun {
j = j.ToV2()
params := toMapProtoStructParameters(j.PipelineSpec.RuntimeConfig.Parameters)
if params == nil {
return &apiv2beta1.RecurringRun{
RecurringRunId: j.UUID,
Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")),
}
}
if len(params) == 0 {
params = toMapProtoStructParameters(j.PipelineSpec.Parameters)
if params == nil {
return &apiv2beta1.RecurringRun{
RecurringRunId: j.UUID,
Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")),
}
}
}
runtimeConfig := toApiRuntimeConfig(j.PipelineSpec.RuntimeConfig)
if runtimeConfig == nil {
return &apiv2beta1.RecurringRun{
RecurringRunId: j.UUID,
Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")),
}
}
if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 {
runtimeConfig = nil
if runtimeConfig == nil || (len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0) {
if params := toMapProtoStructParameters(j.PipelineSpec.Parameters); len(params) > 0 {
runtimeConfig.Parameters = params
} else {
runtimeConfig = nil
}
}

apiRecurringRunV2 := &apiv2beta1.RecurringRun{
Expand Down
35 changes: 34 additions & 1 deletion backend/src/apiserver/server/api_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ func TestToApiJob_ErrorParsingField(t *testing.T) {

apiJob := toApiJobV1(modelJob)
assert.Equal(t, "job1", apiJob.Id)
assert.Contains(t, apiJob.Error, "Pipeline spec parameters were not parsed correctly")
assert.Contains(t, apiJob.Error, "Pipeline v1 parameters were not parsed correctly")

apiJob2 := toApiJobV1(modelJob2)
assert.Equal(t, "job2", apiJob2.Id)
Expand Down Expand Up @@ -2073,6 +2073,39 @@ func TestToApiParameters(t *testing.T) {
modelParameters := `[{"name":"param2","value":"world"}]`
actualApiParameters := toApiParametersV1(modelParameters)
assert.Equal(t, expectedApiParameters, actualApiParameters)

expectedApiParameters = []*apiv1beta1.Parameter{
{
Name: "pipeline-root",
Value: "gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}",
},
{
Name: "version",
Value: "2",
},
}
modelParameters = `[{"name":"pipeline-root","value":"gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"},{"name":"version","value":"2"}]`
actualApiParameters = toApiParametersV1(modelParameters)
assert.Equal(t, expectedApiParameters, actualApiParameters)

}

func TestToMapProtoStructParameters(t *testing.T) {
expectedApiParameters := map[string]*structpb.Value{
"param2": structpb.NewStringValue("world"),
}
modelParameters := `[{"name":"param2","value":"world"}]`
actualApiParameters := toMapProtoStructParameters(modelParameters)
assert.Equal(t, expectedApiParameters, actualApiParameters)

expectedApiParameters = map[string]*structpb.Value{
"pipeline-root": structpb.NewStringValue("gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"),
"version": structpb.NewStringValue("2"),
}
modelParameters = `[{"name":"pipeline-root","value":"gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"},{"name":"version","value":"2"}]`
actualApiParameters = toMapProtoStructParameters(modelParameters)
assert.Equal(t, expectedApiParameters, actualApiParameters)

}

func TestToApiRuntimeConfigV1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/report_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *ReportServer) ReportWorkflow(ctx context.Context,
return s.reportWorkflow(ctx, request.GetWorkflow())
}

// Reports a schduled workflow.
// Reports a scheduled workflow.
func (s *ReportServer) reportScheduledWorkflow(ctx context.Context, swf string) (*empty.Empty, error) {
scheduledWorkflow, err := validateReportScheduledWorkflowRequest(swf)
if err != nil {
Expand Down
Loading

0 comments on commit ce096dd

Please sign in to comment.