Skip to content

Commit

Permalink
Add pipeline version to job/run integration test so that job/run is c… (
Browse files Browse the repository at this point in the history
kubeflow#3270)

* Add pipeline version to job/run integration test so that job/run is created from pipeline version

* Use set comparison to check array equation in test

* address comments to use elements match
  • Loading branch information
jingzhang36 authored and Jeffwan committed Dec 9, 2020
1 parent 1dd77e0 commit 7594553
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 53 deletions.
89 changes: 54 additions & 35 deletions backend/test/integration/job_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ func (s *JobApiTestSuite) TestJobApis() {
helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
assert.Nil(t, err)

/* ---------- Upload pipeline version YAML ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(helloWorldPipeline.ID),
})
assert.Nil(t, err)

/* ---------- Create a new hello world experiment ---------- */
experiment := &experiment_model.APIExperiment{Name: "hello world experiment"}
helloWorldExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
Expand All @@ -97,24 +106,23 @@ func (s *JobApiTestSuite) TestJobApis() {
createJobRequest := &jobparams.CreateJobParams{Body: &job_model.APIJob{
Name: "hello world",
Description: "this is hello world",
PipelineSpec: &job_model.APIPipelineSpec{
PipelineID: helloWorldPipeline.ID,
},
ResourceReferences: []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: helloWorldExperiment.ID},
Relationship: job_model.APIRelationshipOWNER},
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: helloWorldPipelineVersion.ID},
Relationship: job_model.APIRelationshipCREATOR},
},
MaxConcurrency: 10,
Enabled: true,
}}
helloWorldJob, err := s.jobClient.Create(createJobRequest)
assert.Nil(t, err)
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID)
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name)

/* ---------- Get hello world job ---------- */
helloWorldJob, err = s.jobClient.Get(&jobparams.GetJobParams{ID: helloWorldJob.ID})
assert.Nil(t, err)
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID)
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name)

/* ---------- Create a new argument parameter experiment ---------- */
experiment = &experiment_model.APIExperiment{Name: "argument parameter experiment"}
Expand Down Expand Up @@ -228,15 +236,24 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
assert.Nil(t, err)

/* ---------- Upload pipeline version YAML ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(pipeline.ID),
})
assert.Nil(t, err)

/* ---------- Create a periodic job with start and end date in the past and catchup = true ---------- */
experiment := &experiment_model.APIExperiment{Name: "periodic catchup true"}
periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job := jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupTrueExperiment.ID,
periodic: true,
pipelineVersionId: helloWorldPipelineVersion.ID,
experimentId: periodicCatchupTrueExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
Expand All @@ -251,9 +268,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupFalseExperiment.ID,
periodic: true,
pipelineVersionId: helloWorldPipelineVersion.ID,
experimentId: periodicCatchupFalseExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
Expand All @@ -268,9 +285,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupTrueExperiment.ID,
periodic: false,
pipelineVersionId: helloWorldPipelineVersion.ID,
experimentId: cronCatchupTrueExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
Expand All @@ -285,9 +302,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupFalseExperiment.ID,
periodic: false,
pipelineVersionId: helloWorldPipelineVersion.ID,
experimentId: cronCatchupFalseExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
Expand Down Expand Up @@ -324,31 +341,36 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
assert.Equal(t, 1, runsWhenCatchupFalse)
}

func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineID string) {
func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineVersionId string, pipelineVersionName string) {
// Check workflow manifest is not empty
assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay")

// Check resource references contain experiment and pipeline version.
resourceReferences := []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentID},
Name: experimentName, Relationship: job_model.APIRelationshipOWNER,
},
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId},
Name: pipelineVersionName, Relationship: job_model.APIRelationshipCREATOR},
}
assert.ElementsMatch(t, job.ResourceReferences, resourceReferences)

// Check other fields in job object (other than resource references)
job.ResourceReferences = nil
expectedJob := &job_model.APIJob{
ID: job.ID,
Name: "hello world",
Description: "this is hello world",
PipelineSpec: &job_model.APIPipelineSpec{
PipelineID: pipelineID,
PipelineName: "hello-world.yaml",
WorkflowManifest: job.PipelineSpec.WorkflowManifest,
},
ResourceReferences: []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentID},
Name: experimentName, Relationship: job_model.APIRelationshipOWNER,
},
},
MaxConcurrency: 10,
Enabled: true,
CreatedAt: job.CreatedAt,
UpdatedAt: job.UpdatedAt,
Status: job.Status,
Trigger: &job_model.APITrigger{},
}

assert.Equal(t, expectedJob, job)
}

Expand Down Expand Up @@ -420,8 +442,6 @@ func TestJobApi(t *testing.T) {
suite.Run(t, new(JobApiTestSuite))
}

// TODO(jingzhang36): include UploadPipelineVersion in integration test

func (s *JobApiTestSuite) TearDownSuite() {
if *runIntegrationTests {
if !*isDevMode {
Expand All @@ -439,16 +459,15 @@ func (s *JobApiTestSuite) cleanUp() {
test.DeleteAllRuns(s.runClient, s.T())
}

func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob {
func defaultApiJob(pipelineVersionId, experimentId string) *job_model.APIJob {
return &job_model.APIJob{
Name: "default-pipeline-name",
Description: "This is a default pipeline",
PipelineSpec: &job_model.APIPipelineSpec{
PipelineID: pipelineId,
},
ResourceReferences: []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentId},
Relationship: job_model.APIRelationshipOWNER},
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId},
Relationship: job_model.APIRelationshipCREATOR},
},
MaxConcurrency: 10,
NoCatchup: false,
Expand All @@ -464,15 +483,15 @@ func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob {
}

type jobOptions struct {
pipelineId, experimentId string
periodic bool
pipelineVersionId, experimentId string
periodic bool
}

func jobInThePastForTwoMinutes(options jobOptions) *job_model.APIJob {
startTime := strfmt.DateTime(time.Unix(10*hour, 0))
endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0))

job := defaultApiJob(options.pipelineId, options.experimentId)
job := defaultApiJob(options.pipelineVersionId, options.experimentId)
if options.periodic {
job.Trigger = &job_model.APITrigger{
PeriodicSchedule: &job_model.APIPeriodicSchedule{
Expand Down
51 changes: 33 additions & 18 deletions backend/test/integration/run_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"io/ioutil"
"testing"
"time"

"github.com/kubeflow/pipelines/backend/test"

Expand Down Expand Up @@ -72,31 +73,39 @@ func (s *RunApiTestSuite) TestRunApis() {
helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
assert.Nil(t, err)

/* ---------- Upload a pipeline version YAML under helloWorldPipeline ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(helloWorldPipeline.ID),
})
assert.Nil(t, err)

/* ---------- Create a new hello world experiment ---------- */
experiment := &experiment_model.APIExperiment{Name: "hello world experiment"}
helloWorldExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

/* ---------- Create a new hello world run by specifying pipeline ID ---------- */
/* ---------- Create a new hello world run by specifying pipeline version ID ---------- */
createRunRequest := &runparams.CreateRunParams{Body: &run_model.APIRun{
Name: "hello world",
Description: "this is hello world",
PipelineSpec: &run_model.APIPipelineSpec{
PipelineID: helloWorldPipeline.ID,
},
ResourceReferences: []*run_model.APIResourceReference{
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: helloWorldExperiment.ID},
Name: helloWorldExperiment.Name, Relationship: run_model.APIRelationshipOWNER},
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypePIPELINEVERSION, ID: helloWorldPipelineVersion.ID},
Relationship: run_model.APIRelationshipCREATOR},
},
}}
helloWorldRunDetail, _, err := s.runClient.Create(createRunRequest)
assert.Nil(t, err)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name)

/* ---------- Get hello world run ---------- */
helloWorldRunDetail, _, err = s.runClient.Get(&runparams.GetRunParams{RunID: helloWorldRunDetail.Run.ID})
assert.Nil(t, err)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name)

/* ---------- Create a new argument parameter experiment ---------- */
createExperimentRequest := &experimentparams.CreateExperimentParams{Body: &experiment_model.APIExperiment{Name: "argument parameter experiment"}}
Expand Down Expand Up @@ -196,16 +205,23 @@ func (s *RunApiTestSuite) TestRunApis() {
longRunningPipeline, err := s.pipelineUploadClient.UploadFile("../resources/long-running.yaml", uploadParams.NewUploadPipelineParamsWithTimeout(350))
assert.Nil(t, err)

/* ---------- Upload a long-running pipeline version YAML under longRunningPipeline ---------- */
time.Sleep(1 * time.Second)
longRunningPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion("../resources/long-running.yaml", &uploadParams.UploadPipelineVersionParams{
Name: util.StringPointer("long-running-version"),
Pipelineid: util.StringPointer(longRunningPipeline.ID),
})
assert.Nil(t, err)

/* ---------- Create a new long-running run by specifying pipeline ID ---------- */
createLongRunningRunRequest := &runparams.CreateRunParams{Body: &run_model.APIRun{
Name: "long running",
Description: "this pipeline will run long enough for us to manually terminate it before it finishes",
PipelineSpec: &run_model.APIPipelineSpec{
PipelineID: longRunningPipeline.ID,
},
ResourceReferences: []*run_model.APIResourceReference{
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: helloWorldExperiment.ID},
Relationship: run_model.APIRelationshipOWNER},
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypePIPELINEVERSION, ID: longRunningPipelineVersion.ID},
Relationship: run_model.APIRelationshipCREATOR},
},
}}
longRunningRunDetail, _, err := s.runClient.Create(createLongRunningRunRequest)
Expand All @@ -219,10 +235,10 @@ func (s *RunApiTestSuite) TestRunApis() {
/* ---------- Get long-running run ---------- */
longRunningRunDetail, _, err = s.runClient.Get(&runparams.GetRunParams{RunID: longRunningRunDetail.Run.ID})
assert.Nil(t, err)
s.checkTerminatedRunDetail(t, longRunningRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, longRunningPipeline.ID)
s.checkTerminatedRunDetail(t, longRunningRunDetail, helloWorldExperiment.ID, helloWorldExperiment.Name, longRunningPipelineVersion.ID, longRunningPipelineVersion.Name)
}

func (s *RunApiTestSuite) checkTerminatedRunDetail(t *testing.T, runDetail *run_model.APIRunDetail, experimentId string, experimentName string, pipelineId string) {
func (s *RunApiTestSuite) checkTerminatedRunDetail(t *testing.T, runDetail *run_model.APIRunDetail, experimentId string, experimentName string, pipelineVersionId string, pipelineVersionName string) {
// Check workflow manifest is not empty
assert.Contains(t, runDetail.Run.PipelineSpec.WorkflowManifest, "wait-awhile")
// Check runtime workflow manifest is not empty
Expand All @@ -234,14 +250,14 @@ func (s *RunApiTestSuite) checkTerminatedRunDetail(t *testing.T, runDetail *run_
Description: "this pipeline will run long enough for us to manually terminate it before it finishes",
Status: "Terminating",
PipelineSpec: &run_model.APIPipelineSpec{
PipelineID: pipelineId,
PipelineName: "long-running.yaml",
WorkflowManifest: runDetail.Run.PipelineSpec.WorkflowManifest,
},
ResourceReferences: []*run_model.APIResourceReference{
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: experimentId},
Name: experimentName, Relationship: run_model.APIRelationshipOWNER,
},
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId},
Name: pipelineVersionName, Relationship: run_model.APIRelationshipCREATOR},
},
CreatedAt: runDetail.Run.CreatedAt,
ScheduledAt: runDetail.Run.ScheduledAt,
Expand All @@ -250,7 +266,7 @@ func (s *RunApiTestSuite) checkTerminatedRunDetail(t *testing.T, runDetail *run_
assert.Equal(t, expectedRun, runDetail.Run)
}

func (s *RunApiTestSuite) checkHelloWorldRunDetail(t *testing.T, runDetail *run_model.APIRunDetail, experimentId string, experimentName string, pipelineId string) {
func (s *RunApiTestSuite) checkHelloWorldRunDetail(t *testing.T, runDetail *run_model.APIRunDetail, experimentId string, experimentName string, pipelineVersionId string, pipelineVersionName string) {
// Check workflow manifest is not empty
assert.Contains(t, runDetail.Run.PipelineSpec.WorkflowManifest, "whalesay")
// Check runtime workflow manifest is not empty
Expand All @@ -262,14 +278,15 @@ func (s *RunApiTestSuite) checkHelloWorldRunDetail(t *testing.T, runDetail *run_
Description: "this is hello world",
Status: runDetail.Run.Status,
PipelineSpec: &run_model.APIPipelineSpec{
PipelineID: pipelineId,
PipelineName: "hello-world.yaml",
WorkflowManifest: runDetail.Run.PipelineSpec.WorkflowManifest,
},
ResourceReferences: []*run_model.APIResourceReference{
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: experimentId},
Name: experimentName, Relationship: run_model.APIRelationshipOWNER,
},
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId},
Name: pipelineVersionName, Relationship: run_model.APIRelationshipCREATOR,
},
},
CreatedAt: runDetail.Run.CreatedAt,
ScheduledAt: runDetail.Run.ScheduledAt,
Expand Down Expand Up @@ -313,8 +330,6 @@ func TestRunApi(t *testing.T) {
suite.Run(t, new(RunApiTestSuite))
}

// TODO(jingzhang36): include UploadPipelineVersion in integration test

func (s *RunApiTestSuite) TearDownSuite() {
if *runIntegrationTests {
if !*isDevMode {
Expand Down
1 change: 1 addition & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7594553

Please sign in to comment.