Skip to content

Commit b7a0415

Browse files
committed
feat(rst): Add priority and wait queue to sync.
Add priority option to ctl push/pull commands with five priority levels. Add IsWorkRequestReady() method to the rst.Provider interface to validate work is ready. * Before work is processed, IsWorkRequestReady() is called to determine if it should be added to the wait queue. Fix invalid submission id parsing which resulted in each pullInNewWork() call to return "".
1 parent 7cc4235 commit b7a0415

File tree

9 files changed

+403
-42
lines changed

9 files changed

+403
-42
lines changed

common/rst/builder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func (c *JobBuilderClient) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.Job
3838
Path: cfg.Path,
3939
RemoteStorageTarget: 0,
4040
StubLocal: cfg.StubLocal,
41+
Priority: cfg.Priority,
4142
Force: cfg.Force,
4243
Type: &beeremote.JobRequest_Builder{
4344
Builder: &flex.BuilderJob{
@@ -92,6 +93,10 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq
9293
return c.executeJobBuilderRequest(ctx, workRequest, walkChan, jobSubmissionChan, cfg)
9394
}
9495

96+
func (r *JobBuilderClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Time, error) {
97+
return true, time.Time{}, nil
98+
}
99+
95100
// ExecuteWorkRequestPart is not implemented and should never be called.
96101
func (c *JobBuilderClient) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error {
97102
return ErrUnsupportedOpForRST

common/rst/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,7 @@ func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequest
126126
func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) {
127127
return "", ErrUnsupportedOpForRST
128128
}
129+
130+
func (r *MockClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Time, error) {
131+
return true, time.Time{}, nil
132+
}

common/rst/rst.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ type Provider interface {
110110
GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, err error)
111111
// GenerateExternalId can be used to generate an identifier for remote operations.
112112
GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (externalId string, err error)
113+
// IsWorkRequestReady is used to indicate when the work request is ready and will be used to
114+
// start work requests that have been placed into a wait queue. This is useful for providers
115+
// that need the ability to wait for resources to be made available before continuing.
116+
IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (ready bool, retryTime time.Time, err error)
113117
}
114118

115119
// New initializes a provider client based on the provided config. It accepts a context that can be

common/rst/s3.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (r *S3Client) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest
9191
Path: cfg.Path,
9292
RemoteStorageTarget: cfg.RemoteStorageTarget,
9393
StubLocal: cfg.StubLocal,
94+
Priority: cfg.Priority,
9495
Force: cfg.Force,
9596
Type: &beeremote.JobRequest_Sync{
9697
Sync: &flex.SyncJob{
@@ -114,6 +115,7 @@ func (r *S3Client) getJobRequestCfg(request *beeremote.JobRequest) *flex.JobRequ
114115
StubLocal: request.StubLocal,
115116
Overwrite: sync.Overwrite,
116117
Flatten: sync.Flatten,
118+
Priority: request.Priority,
117119
Force: request.Force,
118120
LockedInfo: sync.LockedInfo,
119121
}
@@ -180,6 +182,10 @@ func (r *S3Client) ExecuteJobBuilderRequest(ctx context.Context, workRequest *fl
180182
return ErrUnsupportedOpForRST
181183
}
182184

185+
func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Time, error) {
186+
return true, time.Time{}, nil
187+
}
188+
183189
func (r *S3Client) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error {
184190
if !request.HasSync() {
185191
return ErrReqAndRSTTypeMismatch

ctl/internal/cmd/rst/pushpull.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ WARNING: Files are always uploaded and existing files overwritten unless the rem
4242
if len(args) > 1 {
4343
return fmt.Errorf("invalid number of arguments. Be sure to quote file glob pattern")
4444
}
45+
if backendCfg.Priority < 1 || backendCfg.Priority > 5 {
46+
return fmt.Errorf("invalid priority! Priority must be between 1 and 5")
47+
}
48+
backendCfg.Priority--
4549
return nil
4650
},
4751
RunE: func(cmd *cobra.Command, args []string) error {
@@ -50,6 +54,7 @@ WARNING: Files are always uploaded and existing files overwritten unless the rem
5054
},
5155
}
5256
cmd.Flags().Uint32VarP(&backendCfg.RemoteStorageTarget, "remote-target", "r", 0, "Perform a one time push to the specified Remote Storage Target ID.")
57+
cmd.Flags().Int32Var(&backendCfg.Priority, "priority", 3, "Job priority level (1-5; 1=lowest, 5=highest)")
5358
cmd.Flags().BoolVar(&backendCfg.Force, "force", false, "Force push file(s) to the remote target even if the file is already in sync or another client currently has them open for writing (note the job may later fail or the uploaded file may not be the latest version).")
5459
cmd.Flags().MarkHidden("force")
5560
cmd.Flags().BoolVarP(&frontendCfg.verbose, "verbose", "v", false, "Print additional details about each job (use --debug) to also print work requests and results.")
@@ -70,6 +75,10 @@ func newPullCmd() *cobra.Command {
7075
if len(args) != 1 {
7176
return fmt.Errorf("missing <path> argument")
7277
}
78+
if backendCfg.Priority < 1 || backendCfg.Priority > 5 {
79+
return fmt.Errorf("invalid priority! Priority must be between 1 and 5")
80+
}
81+
backendCfg.Priority--
7382
return nil
7483
},
7584
RunE: func(cmd *cobra.Command, args []string) error {
@@ -83,6 +92,7 @@ func newPullCmd() *cobra.Command {
8392
cmd.Flags().BoolVarP(&backendCfg.StubLocal, "stub-local", "s", false, "Create stub files for the remote objects or files.")
8493
cmd.Flags().BoolVar(&backendCfg.Flatten, "flatten", false, "Flatten the remote directory structure. The directory delimiter will be replaced with an underscore.")
8594
cmd.Flags().BoolVar(&backendCfg.Force, "force", false, "Force pulling file(s) from the remote target even if the file is already in sync or another client currently has them open for reading or writing (note other clients may see errors, the job may later fail, or the downloaded file may not be the latest version).")
95+
cmd.Flags().Int32Var(&backendCfg.Priority, "priority", 3, "Job priority level (1-5; 1=lowest, 5=highest)")
8696
cmd.Flags().MarkHidden("force")
8797
cmd.Flags().BoolVarP(&frontendCfg.verbose, "verbose", "v", false, "Print additional details about each job (use --debug) to also print work requests and results.")
8898
cmd.Flags().IntVar(&frontendCfg.width, "column-width", 35, "Set the maximum width of some columns before they overflow.")

0 commit comments

Comments
 (0)