Skip to content

Commit

Permalink
[Backend] swf catchup option integration tests (kubeflow#3139)
Browse files Browse the repository at this point in the history
* Make local testing easier

* Move cleanup to test setup stage

* Add readme for how to run integration tests

* Add warning about data loss

* Add warning also in the script

* Change flag to isDevMode and cleanup resources if not in dev mode

* Pass through arguments in the bash script

* Fix unit tests

* integration tests for swf NoCatchup option

* Also add cron catchup option tests

* Restructure test

* Adjust format

* Fix BUILD.bazel

* fix typo
  • Loading branch information
Bobgy authored and Jeffwan committed Dec 9, 2020
1 parent 4338cf0 commit b8da724
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
1 change: 1 addition & 0 deletions backend/test/integration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"//backend/test:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_ghodss_yaml//:go_default_library",
"@com_github_go_openapi_strfmt//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//suite:go_default_library",
Expand Down
166 changes: 166 additions & 0 deletions backend/test/integration/job_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/kubeflow/pipelines/backend/test"

"github.com/go-openapi/strfmt"
"github.com/golang/glog"
experimentparams "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service"
"github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_model"
Expand All @@ -23,6 +24,12 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
)

const (
second = 1
minute = 60 * second
hour = 60 * minute
)

type JobApiTestSuite struct {
suite.Suite
namespace string
Expand Down Expand Up @@ -214,6 +221,109 @@ func (s *JobApiTestSuite) TestJobApis() {
s.checkArgParamsRun(t, argParamsRun, argParamsExperiment.ID, argParamsExperiment.Name, argParamsJob.ID, argParamsJob.Name)
}

func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
t := s.T()

/* ---------- Upload pipelines YAML ---------- */
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
assert.Nil(t, err)

/* ---------- Create a periodic job with start and end date in the past and catchup = true ---------- */
experiment := &experiment_model.APIExperiment{Name: "periodic catchup true"}
periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job := jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupTrueExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
job.NoCatchup = false // This is the key difference.
createJobRequest := &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* -------- Create another periodic job with start and end date in the past but catchup = false ------ */
experiment = &experiment_model.APIExperiment{Name: "periodic catchup false"}
periodicCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupFalseExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
job.NoCatchup = true // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* ---------- Create a cron job with start and end date in the past and catchup = true ---------- */
experiment = &experiment_model.APIExperiment{Name: "cron catchup true"}
cronCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupTrueExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
job.NoCatchup = false // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* -------- Create another cron job with start and end date in the past but catchup = false ------ */
experiment = &experiment_model.APIExperiment{Name: "cron catchup false"}
cronCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupFalseExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
job.NoCatchup = true // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

// The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent.
// This could take a few seconds to finish.
// TODO: Retry list run every 5 seconds instead of sleeping for 40 seconds.
time.Sleep(40 * time.Second)

/* ---------- Assert number of runs when catchup = true ---------- */
_, runsWhenCatchupTrue, _, err := s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(periodicCatchupTrueExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 2, runsWhenCatchupTrue)
_, runsWhenCatchupTrue, _, err = s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(cronCatchupTrueExperiment.ID)})

/* ---------- Assert number of runs when catchup = false ---------- */
_, runsWhenCatchupFalse, _, err := s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(periodicCatchupFalseExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 1, runsWhenCatchupFalse)
_, runsWhenCatchupFalse, _, err = s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(cronCatchupFalseExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 1, runsWhenCatchupFalse)
}

func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineID string) {
// Check workflow manifest is not empty
assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay")
Expand Down Expand Up @@ -320,9 +430,65 @@ func (s *JobApiTestSuite) TearDownSuite() {
}
}

/** ======== the following are util functions ========= **/

func (s *JobApiTestSuite) cleanUp() {
test.DeleteAllExperiments(s.experimentClient, s.T())
test.DeleteAllPipelines(s.pipelineClient, s.T())
test.DeleteAllJobs(s.jobClient, s.T())
test.DeleteAllRuns(s.runClient, s.T())
}

func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob {
return &job_model.APIJob{
Name: "default-pipeline-name",
Description: "This is a default pipeline",
PipelineSpec: &job_model.APIPipelineSpec{
PipelineID: pipelineId,
},
ResourceReferences: []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentId},
Relationship: job_model.APIRelationshipOWNER},
},
MaxConcurrency: 10,
NoCatchup: false,
Trigger: &job_model.APITrigger{
PeriodicSchedule: &job_model.APIPeriodicSchedule{
StartTime: strfmt.NewDateTime(),
EndTime: strfmt.NewDateTime(),
IntervalSecond: 60,
},
},
Enabled: true,
}
}

type jobOptions struct {
pipelineId, experimentId string
periodic bool
}

func jobInThePastForTwoMinutes(options jobOptions) *job_model.APIJob {
startTime := strfmt.DateTime(time.Unix(10*hour, 0))
endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0))

job := defaultApiJob(options.pipelineId, options.experimentId)
if options.periodic {
job.Trigger = &job_model.APITrigger{
PeriodicSchedule: &job_model.APIPeriodicSchedule{
StartTime: startTime,
EndTime: endTime,
IntervalSecond: 60, // Runs every 1 minute.
},
}
} else {
job.Trigger = &job_model.APITrigger{
CronSchedule: &job_model.APICronSchedule{
StartTime: startTime,
EndTime: endTime,
Cron: "0 * * * * ?", // Runs every 1 minute.
},
}
}
return job
}

0 comments on commit b8da724

Please sign in to comment.