From ce096dd3bca31775abb6966610d8f222f7c1e120 Mon Sep 17 00:00:00 2001 From: gkcalat <35157096+gkcalat@users.noreply.github.com> Date: Mon, 10 Apr 2023 16:35:54 -0700 Subject: [PATCH] fix(backend): Fix missing parameters in jobs and recurring runs. Closes #9012. (#9118) * Add pipeline_id to run and recurring run protos * Add pipeline_id to run and recurring_run * Fix params in recurring run * Add support for v1 and v2 parameters in runs and jobs * Fix convertion issues * Update a unit-test * Revert change of the order in sql * Address comments --- .../apiserver/resource/resource_manager.go | 6 +- .../resource/resource_manager_test.go | 125 +++++++++++++++++- backend/src/apiserver/server/api_converter.go | 89 ++++++------- .../apiserver/server/api_converter_test.go | 35 ++++- backend/src/apiserver/server/report_server.go | 2 +- backend/src/apiserver/storage/job_store.go | 20 ++- .../src/apiserver/storage/job_store_test.go | 11 +- .../src/apiserver/template/argo_template.go | 9 +- backend/src/apiserver/template/template.go | 26 +++- .../src/apiserver/template/template_test.go | 4 + backend/src/apiserver/template/v2_template.go | 8 +- backend/src/common/util/scheduled_workflow.go | 51 ++++++- .../common/util/scheduled_workflow_test.go | 64 +++++++-- 13 files changed, 370 insertions(+), 80 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 04b75e310b5..65cc13cc9b7 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1143,11 +1143,9 @@ func (r *ResourceManager) ReportWorkflowResource(ctx context.Context, execSpec u namespace = execSpec.ExecutionNamespace() } // Scheduled time equals created time if it is not specified - var scheduledTimeInSec int64 - if execSpec.ScheduledAtInSecOr0() == 0 { + scheduledTimeInSec := execSpec.ScheduledAtInSecOr0() + if scheduledTimeInSec == 0 { scheduledTimeInSec = objMeta.CreationTimestamp.Unix() - } else { - scheduledTimeInSec = execSpec.ScheduledAtInSecOr0() } run = &model.Run{ UUID: runId, diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index e19b4c376cf..11d39333e50 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -3072,7 +3072,6 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) { }, PipelineSpec: model.PipelineSpec{ WorkflowSpecManifest: testWorkflow.ToStringForStore(), - Parameters: "[]", PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest, PipelineName: actualJob.PipelineSpec.PipelineName, }, @@ -3083,6 +3082,130 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) { assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) } +func TestReportScheduledWorkflowResource_Success_withParamsV1(t *testing.T) { + store, manager, job := initWithJob(t) + defer store.Close() + // report scheduled workflow + swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: v1.TypeMeta{ + APIVersion: "kubeflow.org/v1beta1", + Kind: "ScheduledWorkflow", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "MY_NAME", + Namespace: "MY_NAMESPACE", + UID: types.UID(job.UUID), + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Workflow: &swfapi.WorkflowResource{ + Parameters: []swfapi.Parameter{ + { + Name: "param_v1", + Value: "value_v1", + }, + }, + }, + }, + }) + err := manager.ReportScheduledWorkflowResource(swf) + assert.Nil(t, err) + + actualJob, err := manager.GetJob(job.UUID) + assert.Nil(t, err) + + expectedJob := &model.Job{ + K8SName: "MY_NAME", + DisplayName: "j1", + Namespace: actualJob.Namespace, + ExperimentId: actualJob.ExperimentId, + ServiceAccount: "pipeline-runner", + Enabled: false, + UUID: actualJob.UUID, + Conditions: "STATUS_UNSPECIFIED", + Trigger: model.Trigger{ + CronSchedule: model.CronSchedule{ + Cron: util.StringPointer(""), + }, + PeriodicSchedule: model.PeriodicSchedule{ + IntervalSecond: util.Int64Pointer(0), + }, + }, + PipelineSpec: model.PipelineSpec{ + Parameters: `[{"name":"param_v1","value":"value_v1"}]`, + WorkflowSpecManifest: testWorkflow.ToStringForStore(), + PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest, + PipelineName: actualJob.PipelineSpec.PipelineName, + }, + CreatedAtInSec: 2, + UpdatedAtInSec: 3, + } + expectedJob.Conditions = "STATUS_UNSPECIFIED" + assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) +} + +func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.T) { + store, manager, job := initWithJobV2(t) + defer store.Close() + // report scheduled workflow + swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: v1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "updated_name", + Namespace: "ns1", + UID: types.UID(job.UUID), + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Workflow: &swfapi.WorkflowResource{ + Parameters: []swfapi.Parameter{ + { + Name: "param1", + Value: "\"world-updated\"", + }, + }, + }, + }, + }) + err := manager.ReportScheduledWorkflowResource(swf) + assert.Nil(t, err) + + actualJob, err := manager.GetJob(job.UUID) + assert.Nil(t, err) + + expectedJob := &model.Job{ + K8SName: "updated_name", + DisplayName: "j1", + Namespace: "ns1", + ExperimentId: job.ExperimentId, + ServiceAccount: "pipeline-runner", + Enabled: false, + UUID: actualJob.UUID, + Conditions: "STATUS_UNSPECIFIED", + Trigger: model.Trigger{ + CronSchedule: model.CronSchedule{ + Cron: util.StringPointer(""), + }, + PeriodicSchedule: model.PeriodicSchedule{ + IntervalSecond: util.Int64Pointer(0), + }, + }, + PipelineSpec: model.PipelineSpec{ + PipelineSpecManifest: v2SpecHelloWorld, + PipelineName: actualJob.PipelineSpec.PipelineName, + RuntimeConfig: model.RuntimeConfig{ + Parameters: `{"param1":"world-updated"}`, + PipelineRoot: "job-1-root", + }, + }, + CreatedAtInSec: 2, + UpdatedAtInSec: 3, + } + expectedJob.Conditions = "STATUS_UNSPECIFIED" + assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) +} + func TestReportScheduledWorkflowResource_Error(t *testing.T) { store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() diff --git a/backend/src/apiserver/server/api_converter.go b/backend/src/apiserver/server/api_converter.go index 51a2ad547a2..919fea213e4 100644 --- a/backend/src/apiserver/server/api_converter.go +++ b/backend/src/apiserver/server/api_converter.go @@ -654,13 +654,19 @@ func toApiParametersV1(p string) []*apiv1beta1.Parameter { // Supports v1beta1 and v2beta1 API. // Note: returns nil if a parsing error occurs. func toMapProtoStructParameters(p string) map[string]*structpb.Value { - protoParams := make(map[string]*structpb.Value) + protoParams := make(map[string]*structpb.Value, 0) if p == "" || p == "null" || p == "[]" { return protoParams } err := json.Unmarshal([]byte(p), &protoParams) if err != nil { - return nil + if paramsV1 := toApiParametersV1(p); paramsV1 == nil { + return nil + } else { + for _, paramV1 := range paramsV1 { + protoParams[paramV1.Name] = structpb.NewStringValue(paramV1.Value) + } + } } return protoParams } @@ -1317,22 +1323,20 @@ func toApiRunV1(r *model.Run) *apiv1beta1.Run { Error: util.Wrap(errors.New("Failed to parse pipeline spec parameters"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(), } } + var runtimeConfig *apiv1beta1.PipelineSpec_RuntimeConfig if len(specParams) == 0 { specParams = nil - } - - // v2 RuntimeConfig - runtimeConfig := toApiRuntimeConfigV1(r.PipelineSpec.RuntimeConfig) - if runtimeConfig == nil { - return &apiv1beta1.Run{ - Id: r.UUID, - Error: util.Wrap(errors.New("Failed to parse runtime config"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(), + runtimeConfig = toApiRuntimeConfigV1(r.PipelineSpec.RuntimeConfig) + if runtimeConfig == nil { + return &apiv1beta1.Run{ + Id: r.UUID, + Error: util.Wrap(errors.New("Failed to parse runtime config"), "Failed to convert internal run representation to its v1beta1 API counterpart").Error(), + } + } + if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { + runtimeConfig = nil } } - if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { - runtimeConfig = nil - } - var metrics []*apiv1beta1.RunMetric if r.Metrics != nil { metrics = toApiRunMetricsV1(r.Metrics) @@ -1448,7 +1452,11 @@ func toApiRun(r *model.Run) *apiv2beta1.Run { } } if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { - runtimeConfig = nil + if params := toMapProtoStructParameters(r.PipelineSpec.Parameters); len(params) > 0 { + runtimeConfig.Parameters = params + } else { + runtimeConfig = nil + } } apiRd := &apiv2beta1.RunDetails{ PipelineContextId: r.RunDetails.PipelineContextId, @@ -1841,11 +1849,11 @@ func toModelJob(j interface{}) (*model.Job, error) { createTime = apiJob.GetCreatedAt().GetSeconds() updateTime = apiJob.GetUpdatedAt().GetSeconds() - params, err := toModelParameters(apiJob.GetPipelineSpec().GetParameters()) - if err != nil { + if params, err := toModelParameters(apiJob.GetPipelineSpec().GetParameters()); err != nil { return nil, util.Wrap(err, "Failed to convert v1beta1 API recurring run to its internal representation due to parameters parsing error") + } else { + specParams = params } - specParams = params cfg, err := toModelRuntimeConfig(apiJob.GetPipelineSpec().GetRuntimeConfig()) if err != nil { @@ -2041,21 +2049,22 @@ func toApiJobV1(j *model.Job) *apiv1beta1.Job { if specParams == nil { return &apiv1beta1.Job{ Id: j.UUID, - Error: util.NewInternalServerError(util.NewInvalidInputError("Pipeline spec parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(), + Error: util.NewInternalServerError(util.NewInvalidInputError("Pipeline v1 parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(), } } + var runtimeConfig *apiv1beta1.PipelineSpec_RuntimeConfig if len(specParams) == 0 { specParams = nil - } - runtimeConfig := toApiRuntimeConfigV1(j.PipelineSpec.RuntimeConfig) - if runtimeConfig == nil { - return &apiv1beta1.Job{ - Id: j.UUID, - Error: util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(), + runtimeConfig = toApiRuntimeConfigV1(j.PipelineSpec.RuntimeConfig) + if runtimeConfig == nil { + return &apiv1beta1.Job{ + Id: j.UUID, + Error: util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its v1beta1 API counterpart").Error(), + } + } + if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { + runtimeConfig = nil } - } - if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { - runtimeConfig = nil } resRefs := toApiResourceReferencesV1(j.ResourceReferences) if resRefs == nil { @@ -2149,22 +2158,6 @@ func toApiJobV1(j *model.Job) *apiv1beta1.Job { // Supports v2beta1 API. func toApiRecurringRun(j *model.Job) *apiv2beta1.RecurringRun { j = j.ToV2() - params := toMapProtoStructParameters(j.PipelineSpec.RuntimeConfig.Parameters) - if params == nil { - return &apiv2beta1.RecurringRun{ - RecurringRunId: j.UUID, - Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")), - } - } - if len(params) == 0 { - params = toMapProtoStructParameters(j.PipelineSpec.Parameters) - if params == nil { - return &apiv2beta1.RecurringRun{ - RecurringRunId: j.UUID, - Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime parameters were not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")), - } - } - } runtimeConfig := toApiRuntimeConfig(j.PipelineSpec.RuntimeConfig) if runtimeConfig == nil { return &apiv2beta1.RecurringRun{ @@ -2172,8 +2165,12 @@ func toApiRecurringRun(j *model.Job) *apiv2beta1.RecurringRun { Error: util.ToRpcStatus(util.NewInternalServerError(util.NewInvalidInputError("Runtime config was not parsed correctly"), "Failed to convert recurring run's internal representation to its API counterpart")), } } - if len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0 { - runtimeConfig = nil + if runtimeConfig == nil || (len(runtimeConfig.GetParameters()) == 0 && len(runtimeConfig.GetPipelineRoot()) == 0) { + if params := toMapProtoStructParameters(j.PipelineSpec.Parameters); len(params) > 0 { + runtimeConfig.Parameters = params + } else { + runtimeConfig = nil + } } apiRecurringRunV2 := &apiv2beta1.RecurringRun{ diff --git a/backend/src/apiserver/server/api_converter_test.go b/backend/src/apiserver/server/api_converter_test.go index 8b7e75f8772..1719ace361c 100644 --- a/backend/src/apiserver/server/api_converter_test.go +++ b/backend/src/apiserver/server/api_converter_test.go @@ -1628,7 +1628,7 @@ func TestToApiJob_ErrorParsingField(t *testing.T) { apiJob := toApiJobV1(modelJob) assert.Equal(t, "job1", apiJob.Id) - assert.Contains(t, apiJob.Error, "Pipeline spec parameters were not parsed correctly") + assert.Contains(t, apiJob.Error, "Pipeline v1 parameters were not parsed correctly") apiJob2 := toApiJobV1(modelJob2) assert.Equal(t, "job2", apiJob2.Id) @@ -2073,6 +2073,39 @@ func TestToApiParameters(t *testing.T) { modelParameters := `[{"name":"param2","value":"world"}]` actualApiParameters := toApiParametersV1(modelParameters) assert.Equal(t, expectedApiParameters, actualApiParameters) + + expectedApiParameters = []*apiv1beta1.Parameter{ + { + Name: "pipeline-root", + Value: "gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}", + }, + { + Name: "version", + Value: "2", + }, + } + modelParameters = `[{"name":"pipeline-root","value":"gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"},{"name":"version","value":"2"}]` + actualApiParameters = toApiParametersV1(modelParameters) + assert.Equal(t, expectedApiParameters, actualApiParameters) + +} + +func TestToMapProtoStructParameters(t *testing.T) { + expectedApiParameters := map[string]*structpb.Value{ + "param2": structpb.NewStringValue("world"), + } + modelParameters := `[{"name":"param2","value":"world"}]` + actualApiParameters := toMapProtoStructParameters(modelParameters) + assert.Equal(t, expectedApiParameters, actualApiParameters) + + expectedApiParameters = map[string]*structpb.Value{ + "pipeline-root": structpb.NewStringValue("gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"), + "version": structpb.NewStringValue("2"), + } + modelParameters = `[{"name":"pipeline-root","value":"gs://my-bucket/tfx_taxi_simple/{{workflow.uid}}"},{"name":"version","value":"2"}]` + actualApiParameters = toMapProtoStructParameters(modelParameters) + assert.Equal(t, expectedApiParameters, actualApiParameters) + } func TestToApiRuntimeConfigV1(t *testing.T) { diff --git a/backend/src/apiserver/server/report_server.go b/backend/src/apiserver/server/report_server.go index bb58392eca9..8ae30ec2efe 100644 --- a/backend/src/apiserver/server/report_server.go +++ b/backend/src/apiserver/server/report_server.go @@ -74,7 +74,7 @@ func (s *ReportServer) ReportWorkflow(ctx context.Context, return s.reportWorkflow(ctx, request.GetWorkflow()) } -// Reports a schduled workflow. +// Reports a scheduled workflow. func (s *ReportServer) reportScheduledWorkflow(ctx context.Context, swf string) (*empty.Empty, error) { scheduledWorkflow, err := validateReportScheduledWorkflowRequest(swf) if err != nil { diff --git a/backend/src/apiserver/storage/job_store.go b/backend/src/apiserver/storage/job_store.go index 7c25b2a48cb..53c2b9f2794 100644 --- a/backend/src/apiserver/storage/job_store.go +++ b/backend/src/apiserver/storage/job_store.go @@ -369,7 +369,7 @@ func (s *JobStore) CreateJob(j *model.Job) (*model.Job, error) { "PipelineName": j.PipelineSpec.PipelineName, "PipelineSpecManifest": j.PipelineSpec.PipelineSpecManifest, "WorkflowSpecManifest": j.PipelineSpec.WorkflowSpecManifest, - "Parameters": j.Parameters, + "Parameters": j.PipelineSpec.Parameters, "RuntimeParameters": j.PipelineSpec.RuntimeConfig.Parameters, "PipelineRoot": j.PipelineSpec.RuntimeConfig.PipelineRoot, "ExperimentUUID": j.ExperimentId, @@ -434,8 +434,7 @@ func (s *JobStore) UpdateJob(swf *util.ScheduledWorkflow) error { if err != nil { return err } - - sql, args, err := sq. + updateSql := sq. Update("jobs"). SetMap(sq.Eq{ "Name": swf.Name, @@ -445,7 +444,6 @@ func (s *JobStore) UpdateJob(swf *util.ScheduledWorkflow) error { "Conditions": model.StatusState(swf.ConditionSummary()).ToString(), "MaxConcurrency": swf.MaxConcurrencyOr0(), "NoCatchup": swf.NoCatchupOrFalse(), - "Parameters": parameters, "UpdatedAtInSec": now, "CronScheduleStartTimeInSec": PointerToNullInt64(swf.CronScheduleStartTimeInSecOrNull()), "CronScheduleEndTimeInSec": PointerToNullInt64(swf.CronScheduleEndTimeInSecOrNull()), @@ -453,9 +451,17 @@ func (s *JobStore) UpdateJob(swf *util.ScheduledWorkflow) error { "PeriodicScheduleStartTimeInSec": PointerToNullInt64(swf.PeriodicScheduleStartTimeInSecOrNull()), "PeriodicScheduleEndTimeInSec": PointerToNullInt64(swf.PeriodicScheduleEndTimeInSecOrNull()), "IntervalSecond": swf.IntervalSecondOr0(), - }). - Where(sq.Eq{"UUID": string(swf.UID)}). - ToSql() + }) + if len(parameters) > 0 { + if swf.GetVersion() == util.SWFv1 { + updateSql = updateSql.SetMap(sq.Eq{"Parameters": parameters}) + } else if swf.GetVersion() == util.SWFv2 { + updateSql = updateSql.SetMap(sq.Eq{"RuntimeParameters": parameters}) + } else { + return util.NewInternalServerError(util.NewInvalidInputError("ScheduledWorkflow has an invalid version: %v", swf.GetVersion()), "Failed to update job %v", swf.UID) + } + } + sql, args, err := updateSql.Where(sq.Eq{"UUID": string(swf.UID)}).ToSql() if err != nil { return util.NewInternalServerError(err, "Error while creating query to update job with scheduled workflow: %v: %+v", diff --git a/backend/src/apiserver/storage/job_store_test.go b/backend/src/apiserver/storage/job_store_test.go index 7834f8c36d9..eb0ca34054f 100644 --- a/backend/src/apiserver/storage/job_store_test.go +++ b/backend/src/apiserver/storage/job_store_test.go @@ -474,6 +474,7 @@ func TestCreateJob(t *testing.T) { K8SName: "pp1", Namespace: "n1", PipelineSpec: model.PipelineSpec{ + Parameters: `[{"name":"string1","value":"one"},{"name":"number2","value":"2"}]`, PipelineId: pipeline.UUID, PipelineName: "p1", }, @@ -491,6 +492,7 @@ func TestCreateJob(t *testing.T) { K8SName: "pp1", Namespace: "n1", PipelineSpec: model.PipelineSpec{ + Parameters: `[{"name":"string1","value":"one"},{"name":"number2","value":"2"}]`, PipelineId: pipeline.UUID, PipelineName: "p1", }, @@ -502,6 +504,10 @@ func TestCreateJob(t *testing.T) { jobExpected = jobExpected.ToV1() assert.Equal(t, jobExpected, job.ToV1(), "Got unexpected jobs") + newJob, err := jobStore.GetJob(job.UUID) + assert.Nil(t, err) + assert.Equal(t, jobExpected, newJob.ToV1(), "Got unexpected jobs") + // Check resource reference exists resourceReferenceStore := NewResourceReferenceStore(db) r, err := resourceReferenceStore.GetResourceReference("1", model.JobResourceType, model.ExperimentResourceType) @@ -732,6 +738,10 @@ func TestUpdateJob_Success(t *testing.T) { assert.Equal(t, jobExpected.ToV1(), job.ToV1()) swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v1beta1", + Kind: "ScheduledWorkflow", + }, ObjectMeta: metav1.ObjectMeta{ Name: "MY_NAME", Namespace: "n1", @@ -865,7 +875,6 @@ func TestUpdateJob_MostlyEmptySpec(t *testing.T) { PipelineSpec: model.PipelineSpec{ PipelineId: DefaultFakePipelineIdTwo, PipelineName: "p1", - Parameters: "[]", }, Trigger: model.Trigger{ CronSchedule: model.CronSchedule{ diff --git a/backend/src/apiserver/template/argo_template.go b/backend/src/apiserver/template/argo_template.go index e49266177c1..638642b4c2f 100644 --- a/backend/src/apiserver/template/argo_template.go +++ b/backend/src/apiserver/template/argo_template.go @@ -122,9 +122,10 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu // TODO: Fix the components to explicitly declare the artifacts they really output. workflow.PatchTemplateOutputArtifacts() - swfParameters, err := modelToCRDParameters(modelJob.RuntimeConfig.Parameters) + // We assume that v1 Argo template use v1 parameters ignoring runtime config + swfParameters, err := stringArrayToCRDParameters(modelJob.Parameters) if err != nil { - return nil, util.Wrap(err, "Failed to convert model parameters to CRD parameters") + return nil, util.Wrap(err, "Failed to convert v1 parameters to CRD parameters") } crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) if err != nil { @@ -132,6 +133,10 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu } scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v1beta1", + Kind: "ScheduledWorkflow", + }, ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: modelJob.Enabled, diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 53be481cf97..08dcbcae959 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -143,7 +143,11 @@ func modelToPipelineJobRuntimeConfig(modelRuntimeConfig *model.RuntimeConfig) (* return runtimeConfig, nil } -func modelToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { +// Converts serialized runtime config's parameters to []scheduledworkflow.Parameter. +// Assumes that the serialized parameters will take a form of +// map[string]*structpb.Value, which works for runtimeConfig.Parameters such as +// {"param1":"value1","param2":"value2"}. +func stringMapToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { var swParams []scheduledworkflow.Parameter var parameters map[string]*structpb.Value if modelParams == "" { @@ -167,6 +171,26 @@ func modelToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, er return swParams, nil } +// Converts serialized v1 parameters to []scheduledworkflow.Parameter. +// Assumes that the serialized parameters will take a form of +// []map[string]string, which works for legacy v1 parameters such as +// [{"name":"param1","value":"value1"},{"name":"param2","value":"value2"}]. +func stringArrayToCRDParameters(modelParameters string) ([]scheduledworkflow.Parameter, error) { + var paramsMapList []*map[string]string + var desiredParams []scheduledworkflow.Parameter + if modelParameters == "" { + return desiredParams, nil + } + err := json.Unmarshal([]byte(modelParameters), ¶msMapList) + if err != nil { + return nil, err + } + for _, param := range paramsMapList { + desiredParams = append(desiredParams, scheduledworkflow.Parameter{Name: (*param)["name"], Value: (*param)["value"]}) + } + return desiredParams, nil +} + func modelToParametersMap(modelParameters string) (map[string]string, error) { var paramsMapList []*map[string]string desiredParamsMap := make(map[string]string) diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index 594696f7e74..7ef551031b9 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -201,6 +201,10 @@ func TestScheduledWorkflow(t *testing.T) { } expectedScheduledWorkflow := scheduledworkflow.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, ObjectMeta: metav1.ObjectMeta{GenerateName: "name1"}, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: true, diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index b699c9d5282..a1447e3edd8 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -87,9 +87,9 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche if err != nil { return nil, util.Wrap(err, "Create job failed") } - parameters, err := modelToCRDParameters(modelJob.RuntimeConfig.Parameters) + parameters, err := stringMapToCRDParameters(modelJob.RuntimeConfig.Parameters) if err != nil { - return nil, util.Wrap(err, "Converting model.Job parameters to CDR parameters failed") + return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed") } crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) if err != nil { @@ -97,6 +97,10 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, Spec: scheduledworkflow.ScheduledWorkflowSpec{ Enabled: modelJob.Enabled, diff --git a/backend/src/common/util/scheduled_workflow.go b/backend/src/common/util/scheduled_workflow.go index 14393b14f18..07b292d7194 100644 --- a/backend/src/common/util/scheduled_workflow.go +++ b/backend/src/common/util/scheduled_workflow.go @@ -15,11 +15,27 @@ package util import ( + "strings" + "github.com/golang/glog" swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" + "google.golang.org/protobuf/types/known/structpb" "k8s.io/apimachinery/pkg/util/json" ) +type ScheduledWorkflowType string + +const ( + SWFv1 ScheduledWorkflowType = "v1beta1" + SWFv2 ScheduledWorkflowType = "v2beta1" + SWFlegacy ScheduledWorkflowType = "legacy" + SWFunknown ScheduledWorkflowType = "Unknown" + + apiVersionV1 = "kubeflow.org/v1beta1" + apiVersionV2 = "kubeflow.org/v2beta1" + swfKind = "ScheduledWorkflow" +) + // ScheduledWorkflow is a type to help manipulate ScheduledWorkflow objects. type ScheduledWorkflow struct { *swfapi.ScheduledWorkflow @@ -102,13 +118,29 @@ func (s *ScheduledWorkflow) ConditionSummary() string { } func (s *ScheduledWorkflow) ParametersAsString() (string, error) { - var params []swfapi.Parameter + var params interface{} if s.ScheduledWorkflow.Spec.Workflow == nil { - params = make([]swfapi.Parameter, 0) - } else { + return "", nil + } + switch s.GetVersion() { + case SWFv1: params = s.ScheduledWorkflow.Spec.Workflow.Parameters + case SWFv2: + paramsMap := make(map[string]*structpb.Value, 0) + for _, param := range s.ScheduledWorkflow.Spec.Workflow.Parameters { + var protoValue structpb.Value + err := json.Unmarshal([]byte(param.Value), &protoValue) + if err != nil { + return "", err + } + paramsMap[param.Name] = &protoValue + } + params = paramsMap + case SWFlegacy: + return "", NewInvalidInputError("Found a ScheduledWorkflow with empty type metadata. ScheduledWorkflow must have valid APIVersion and Kind. ObjectMeta: %v", s.ObjectMeta) + default: + return "", NewInternalServerError(NewInvalidInputError("ScheduledWorkflow has invalid type metadata: %v", s.TypeMeta), "Failed to serialize parameters as string for ScheduledWorkflow %v", s.ObjectMeta) } - paramsBytes, err := json.Marshal(params) if err != nil { return "", NewInvalidInputError( @@ -126,3 +158,14 @@ func (s *ScheduledWorkflow) ToStringForStore() string { } return string(swf) } + +func (s *ScheduledWorkflow) GetVersion() ScheduledWorkflowType { + if strings.HasPrefix(s.APIVersion, apiVersionV1) && s.Kind == swfKind { + return SWFv1 + } else if strings.HasPrefix(s.APIVersion, apiVersionV2) && s.Kind == swfKind { + return SWFv2 + } else if s.APIVersion == "" && s.Kind == "" { + return SWFlegacy + } + return SWFunknown +} diff --git a/backend/src/common/util/scheduled_workflow_test.go b/backend/src/common/util/scheduled_workflow_test.go index 9a0dc7d0b00..32dd4ada18d 100644 --- a/backend/src/common/util/scheduled_workflow_test.go +++ b/backend/src/common/util/scheduled_workflow_test.go @@ -129,32 +129,76 @@ func TestScheduledWorkflow_ParametersAsString(t *testing.T) { }) assert.Nil(t, err) - workflow := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + // v2 runtime config's string parameter + workflowV2 := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, Spec: swfapi.ScheduledWorkflowSpec{ Workflow: &swfapi.WorkflowResource{ Parameters: []swfapi.Parameter{ - {Name: "PARAM1", Value: "NEW_VALUE1"}, - {Name: "PARAM2", Value: "NEW_VALUE2"}, + {Name: "STRING_PARAM1", Value: "\"ONE\""}, }, Spec: string(spec), }, }, }) - - result, err := workflow.ParametersAsString() + resultV2, err := workflowV2.ParametersAsString() assert.Nil(t, err) + assert.Equal(t, + "{\"STRING_PARAM1\":\"ONE\"}", + resultV2) + // v2 runtime config's numeric parameter + workflowV2 = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Workflow: &swfapi.WorkflowResource{ + Parameters: []swfapi.Parameter{ + {Name: "NUMERIC_PARAM2", Value: "2"}, + }, + Spec: string(spec), + }, + }, + }) + resultV2, err = workflowV2.ParametersAsString() + assert.Nil(t, err) assert.Equal(t, - "[{\"name\":\"PARAM1\",\"value\":\"NEW_VALUE1\"},{\"name\":\"PARAM2\",\"value\":\"NEW_VALUE2\"}]", - result) + "{\"NUMERIC_PARAM2\":2}", + resultV2) + // v1 parameters + workflowV1 := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v1beta1", + Kind: "ScheduledWorkflow", + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Workflow: &swfapi.WorkflowResource{ + Parameters: []swfapi.Parameter{ + {Name: "PARAM1", Value: "one"}, + {Name: "PARAM2", Value: "2"}, + }, + Spec: string(spec), + }, + }, + }) + resultV1, err := workflowV1.ParametersAsString() + assert.Nil(t, err) + assert.Equal(t, + `[{"name":"PARAM1","value":"one"},{"name":"PARAM2","value":"2"}]`, + resultV1) // No params - workflow = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + workflow := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ Spec: swfapi.ScheduledWorkflowSpec{}, }) - result, err = workflow.ParametersAsString() + result, err := workflow.ParametersAsString() assert.Nil(t, err) - assert.Equal(t, "[]", result) + assert.Equal(t, "", result) }