Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runs and jobs can be created from pipeline version #2445

Merged
merged 42 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
087f2ab
add version api
jingzhang36 Oct 9, 2019
3972ab9
unit tests
jingzhang36 Oct 9, 2019
e2c2def
remove debug fmt
jingzhang36 Oct 9, 2019
b329fe0
remove unused func
jingzhang36 Oct 10, 2019
a488eef
remove another unused method
jingzhang36 Oct 10, 2019
c97d39c
formatting
jingzhang36 Oct 10, 2019
be61038
remove unused consts
jingzhang36 Oct 10, 2019
20de892
some comments
jingzhang36 Oct 10, 2019
386e6af
build
jingzhang36 Oct 11, 2019
cd8043d
unit tests
jingzhang36 Oct 11, 2019
380a302
unit tests
jingzhang36 Oct 11, 2019
546e9d3
formatting
jingzhang36 Oct 11, 2019
22404e8
unit tests
jingzhang36 Oct 11, 2019
5d5e277
run from pipeline version
jingzhang36 Oct 12, 2019
57aaa3e
pipeline version as resource type
jingzhang36 Oct 12, 2019
01ce9d0
run store and resource reference store
jingzhang36 Oct 12, 2019
2a4fa11
formatting and removing debug traces
jingzhang36 Oct 14, 2019
0743ed7
run server test
jingzhang36 Oct 14, 2019
26efcf3
job created from pipeline version
jingzhang36 Oct 14, 2019
9c20175
variable names
jingzhang36 Oct 15, 2019
4802241
address comments
jingzhang36 Oct 15, 2019
f26e4b3
Get pipeline version template is used on pipeline details page, which…
jingzhang36 Oct 15, 2019
35880a3
a temp revert
jingzhang36 Oct 15, 2019
9408182
address comment
jingzhang36 Oct 16, 2019
48f0697
address comment
jingzhang36 Oct 16, 2019
9ed59b8
add comment
jingzhang36 Oct 16, 2019
f83218f
Merge remote-tracking branch 'origin/master' into api-version
jingzhang36 Oct 17, 2019
ba0cae8
Merge remote-tracking branch 'origin/master' into api-run
jingzhang36 Oct 17, 2019
b471d7f
Merge branch 'api-version' into api-run
jingzhang36 Oct 17, 2019
51de4b1
Merge remote-tracking branch 'origin/master' into api-run
jingzhang36 Oct 18, 2019
5b13d8e
get pipeline version template
jingzhang36 Oct 20, 2019
7167db5
verify pipeline version in resource reference
jingzhang36 Oct 20, 2019
95b4925
add unit test for create run from pipeline version
jingzhang36 Oct 21, 2019
b3f2c04
unit test for create job from pipeline version
jingzhang36 Oct 21, 2019
3246b70
Merge remote-tracking branch 'origin/master' into runs
jingzhang36 Oct 21, 2019
e5507ec
remove some comments
jingzhang36 Oct 21, 2019
00b7890
Merge remote-tracking branch 'origin/master' into runs
jingzhang36 Oct 23, 2019
f4b841f
reformat
jingzhang36 Oct 23, 2019
9ddc808
reformat again
jingzhang36 Oct 23, 2019
91c2a0c
Merge remote-tracking branch 'origin/master' into runs
jingzhang36 Oct 24, 2019
7db0e36
Remove an unrelated change
jingzhang36 Oct 24, 2019
4b129f0
change method name
jingzhang36 Oct 25, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ service PipelineService {
// delete: "/apis/v1beta1/pipeline_versions/{version_id}"
// };
}

rpc GetPipelineVersionTemplate(GetPipelineVersionTemplateRequest) returns (GetTemplateResponse) {
// TODO(jingzhang36): uncomment when exposing this API method.
// option (google.api.http) = {
// get: "/apis/v1beta1/pipeline_versions/{version_id}/templates"
// };
}
}

message Url {
Expand Down Expand Up @@ -241,7 +248,7 @@ message Pipeline {
// version is used as default. (In the future, if desired by customers, we
// can allow them to set default version.)
// TODO(jingzhang36): expose this in API pipeline definition with FE changes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove todo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes.

// PipelineVersion default_version = 8;
PipelineVersion default_version = 8;
}

message PipelineVersion {
Expand All @@ -268,4 +275,4 @@ message PipelineVersion {
// Input. Required. E.g., specify which pipeline this pipeline version belongs
// to.
repeated ResourceReference resource_references = 7;
}
}
1 change: 1 addition & 0 deletions backend/api/resource_reference.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum ResourceType {
EXPERIMENT = 1;
JOB = 2;
PIPELINE = 3;
PIPELINE_VERSION = 4;
}

enum Relationship {
Expand Down
11 changes: 7 additions & 4 deletions backend/src/apiserver/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ type ResourceType string
type Relationship string

const (
Experiment ResourceType = "Experiment"
Job ResourceType = "Job"
Run ResourceType = "Run"
Pipeline ResourceType = "pipeline"
Experiment ResourceType = "Experiment"
Job ResourceType = "Job"
Run ResourceType = "Run"
Pipeline ResourceType = "pipeline"
PipelineVersion ResourceType = "PipelineVersion"
)

const (
Expand All @@ -40,6 +41,8 @@ func ToModelResourceType(apiType api.ResourceType) (ResourceType, error) {
return Experiment, nil
case api.ResourceType_JOB:
return Job, nil
case api.ResourceType_PIPELINE_VERSION:
return PipelineVersion, nil
default:
return "", util.NewInvalidInputError("Unsupported resource type: %s", api.ResourceType_name[int32(apiType)])
}
Expand Down
26 changes: 16 additions & 10 deletions backend/src/apiserver/resource/model_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ func (r *ResourceManager) ToModelRunMetric(metric *api.RunMetric, runUUID string
// The input run might not contain workflowSpecManifest, but instead a pipeline ID.
// The caller would retrieve workflowSpecManifest and pass in.
func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow *util.Workflow, workflowSpecManifest string) (*model.RunDetail, error) {
params, err := toModelParameters(run.PipelineSpec.Parameters)
params, err := toModelParameters(run.GetPipelineSpec().GetParameters())
if err != nil {
return nil, util.Wrap(err, "Unable to parse the parameter.")
}
resourceReferences, err := r.toModelResourceReferences(runId, common.Run, run.ResourceReferences)
resourceReferences, err := r.toModelResourceReferences(runId, common.Run, run.GetResourceReferences())
if err != nil {
return nil, util.Wrap(err, "Unable to convert resource references.")
}
var pipelineName string
if run.PipelineSpec.GetPipelineId() != "" {
pipelineName, err = r.getResourceName(common.Pipeline, run.PipelineSpec.GetPipelineId())
if run.GetPipelineSpec().GetPipelineId() != "" {
pipelineName, err = r.getResourceName(common.Pipeline, run.GetPipelineSpec().GetPipelineId())
if err != nil {
return nil, util.Wrap(err, "Error getting the pipeline name")
}
Expand All @@ -63,7 +63,7 @@ func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow
Description: run.Description,
ResourceReferences: resourceReferences,
PipelineSpec: model.PipelineSpec{
PipelineId: run.PipelineSpec.GetPipelineId(),
PipelineId: run.GetPipelineSpec().GetPipelineId(),
PipelineName: pipelineName,
WorkflowSpecManifest: workflowSpecManifest,
Parameters: params,
Expand All @@ -76,17 +76,17 @@ func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow
}

func (r *ResourceManager) ToModelJob(job *api.Job, swf *util.ScheduledWorkflow, workflowSpecManifest string) (*model.Job, error) {
params, err := toModelParameters(job.PipelineSpec.Parameters)
params, err := toModelParameters(job.GetPipelineSpec().GetParameters())
if err != nil {
return nil, util.Wrap(err, "Error parsing the input job.")
}
resourceReferences, err := r.toModelResourceReferences(string(swf.UID), common.Job, job.ResourceReferences)
resourceReferences, err := r.toModelResourceReferences(string(swf.UID), common.Job, job.GetResourceReferences())
if err != nil {
return nil, util.Wrap(err, "Error to convert resource references.")
}
var pipelineName string
if job.PipelineSpec.GetPipelineId() != "" {
pipelineName, err = r.getResourceName(common.Pipeline, job.PipelineSpec.GetPipelineId())
if job.GetPipelineSpec().GetPipelineId() != "" {
pipelineName, err = r.getResourceName(common.Pipeline, job.GetPipelineSpec().GetPipelineId())
if err != nil {
return nil, util.Wrap(err, "Error getting the pipeline name")
}
Expand All @@ -103,7 +103,7 @@ func (r *ResourceManager) ToModelJob(job *api.Job, swf *util.ScheduledWorkflow,
MaxConcurrency: job.MaxConcurrency,
ResourceReferences: resourceReferences,
PipelineSpec: model.PipelineSpec{
PipelineId: job.PipelineSpec.GetPipelineId(),
PipelineId: job.GetPipelineSpec().GetPipelineId(),
PipelineName: pipelineName,
WorkflowSpecManifest: workflowSpecManifest,
Parameters: params,
Expand Down Expand Up @@ -238,6 +238,12 @@ func (r *ResourceManager) getResourceName(resourceType common.ResourceType, reso
return "", util.Wrap(err, "Referred run not found.")
}
return run.DisplayName, nil
case common.PipelineVersion:
version, err := r.GetPipelineVersion(resourceId)
if err != nil {
return "", util.Wrap(err, "Referred pipeline version not found.")
}
return version.Name, nil
default:
return "", util.NewInvalidInputError("Unsupported resource type: %s", string(resourceType))
}
Expand Down
67 changes: 58 additions & 9 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,16 @@ func (r *ResourceManager) GetPipelineTemplate(pipelineId string) ([]byte, error)
}

func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
// Get workflow from pipeline spec, which might be pipeline ID or an argo workflow
// Get workflow from either of the two places:
// (1) raw pipeline manifest in pipeline_spec
// (2) pipeline version in resource_references
var workflowSpecManifestBytes []byte
workflowSpecManifestBytes, err := r.getWorkflowSpecBytes(apiRun.GetPipelineSpec())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiRun.GetResourceReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
}
}
uuid, err := r.uuid.NewRandom()
if err != nil {
Expand Down Expand Up @@ -282,7 +288,7 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
}

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.ResourceReferences)
ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.GetResourceReferences())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,11 +447,18 @@ func (r *ResourceManager) GetJob(id string) (*model.Job, error) {
}

func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
// Get workflow from pipeline spec, which might be pipeline ID or an argo workflow
// Get workflow from either of the two places:
// (1) raw pipeline manifest in pipeline_spec
// (2) pipeline version in resource_references
var workflowSpecManifestBytes []byte
workflowSpecManifestBytes, err := r.getWorkflowSpecBytes(apiJob.GetPipelineSpec())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiJob.GetResourceReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
}
}

var workflow util.Workflow
err = json.Unmarshal(workflowSpecManifestBytes, &workflow)
if err != nil {
Expand All @@ -454,7 +467,7 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
}

// Verify no additional parameter provided
err = workflow.VerifyParameters(toParametersMap(apiJob.PipelineSpec.Parameters))
err = workflow.VerifyParameters(toParametersMap(apiJob.GetPipelineSpec().GetParameters()))
if err != nil {
return nil, util.Wrap(err, "Create job failed")
}
Expand All @@ -473,7 +486,7 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
MaxConcurrency: &apiJob.MaxConcurrency,
Trigger: *toCRDTrigger(apiJob.Trigger),
Workflow: &scheduledworkflow.WorkflowResource{
Parameters: toCRDParameter(apiJob.PipelineSpec.Parameters),
Parameters: toCRDParameter(apiJob.GetPipelineSpec().GetParameters()),
Spec: workflow.Spec,
},
},
Expand All @@ -484,12 +497,12 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
}

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.ResourceReferences)
ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.GetResourceReferences())
if err != nil {
return nil, err
}
if ref != nil {
apiJob.ResourceReferences = append(apiJob.ResourceReferences, ref)
apiJob.ResourceReferences = append(apiJob.GetResourceReferences(), ref)
}

job, err := r.ToModelJob(apiJob, util.NewScheduledWorkflow(newScheduledWorkflow), string(workflowSpecManifestBytes))
Expand Down Expand Up @@ -688,6 +701,8 @@ func (r *ResourceManager) checkRunExist(runID string) (*model.RunDetail, error)
}

func (r *ResourceManager) getWorkflowSpecBytes(spec *api.PipelineSpec) ([]byte, error) {
// TODO(jingzhang36): after FE is enabled to use pipeline version to create
// run, we'll only check for the raw manifest in pipeline_spec.
if spec.GetPipelineId() != "" {
var workflow util.Workflow
err := r.objectStore.GetFromYamlFile(&workflow, storage.CreatePipelinePath(spec.GetPipelineId()))
Expand All @@ -702,6 +717,25 @@ func (r *ResourceManager) getWorkflowSpecBytes(spec *api.PipelineSpec) ([]byte,
return nil, util.NewInvalidInputError("Please provide a valid pipeline spec")
}

func (r *ResourceManager) getWorkflowSpecBytesFromPipelineVersion(references []*api.ResourceReference) ([]byte, error) {
var pipelineVersionId = ""
for _, reference := range references {
if reference.Key.Type == api.ResourceType_PIPELINE_VERSION && reference.Relationship == api.Relationship_CREATOR {
pipelineVersionId = reference.Key.Id
}
}
if len(pipelineVersionId) == 0 {
return nil, util.NewInvalidInputError("No pipeline version.")
}
var workflow util.Workflow
err := r.objectStore.GetFromYamlFile(&workflow, storage.CreatePipelinePath(pipelineVersionId))
if err != nil {
return nil, util.Wrap(err, "Get pipeline YAML failed.")
}

return []byte(workflow.ToStringForStore()), nil
}

// Used to initialize the Experiment database with a default to be used for runs
func (r *ResourceManager) CreateDefaultExperiment() (string, error) {
// First check that we don't already have a default experiment ID in the DB.
Expand Down Expand Up @@ -896,3 +930,18 @@ func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error

return nil
}

func (r *ResourceManager) GetPipelineVersionTemplate(versionId string) ([]byte, error) {
// Verify pipeline version exist
_, err := r.pipelineStore.GetPipelineVersion(versionId)
if err != nil {
return nil, util.Wrap(err, "Get pipeline version template failed")
}

template, err := r.objectStore.GetFile(storage.CreatePipelinePath(fmt.Sprint(versionId)))
if err != nil {
return nil, util.Wrap(err, "Get pipeline version template failed")
}

return template, nil
}
Loading