Skip to content

Conversation

swartzn
Copy link
Contributor

@swartzn swartzn commented Jun 25, 2025

What does this PR do / why do we need it?

  • Add priority option to ctl push/pull commands with five priority levels.
  • Add IsWorkRequestReady() method to the rst.Provider interface to validate work is ready.
    • Before processing a work request, IsWorkRequestReady() is called to check if the client is prepared to handle it. Clients can use this hook to verify resource availability or other preconditions. If not ready, the request is deferred into the wait queue for the duration returned by IsWorkRequestReady().
  • Unit tests have been added to verify the database scheme
  • The following code was injected into the s3 provider to test the wait queue,
func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Time, error) {
	// Only for testing
	_, err := r.mountPoint.Stat(request.JobId)
	if err != nil && errors.Is(err, fs.ErrNotExist) {
		r.mountPoint.CreateWriteClose(request.JobId, []byte{}, 0600, false)
		retryTime := time.Now().Add(5 * time.Second)
		return false, retryTime, nil
	}
	return true, time.Time{}, nil
}

Note: In order to accommodate the addition of priorities the following protobuf branch is needed: swartzn/add-priority-wait-queue

Where should the reviewer(s) start reviewing this?

There are two commits.

  1. Extends mapStore functionality to support getting entries across ranges and exposes GenerateNextPK() so consumers can utilize the counter.
  2. Adds a second database for the wait queue and priority schema for the 13-character base-36 submission id key.

The changes to mapStore are independent in the first commit and should be reviewed first . Afterwards, understand the scheme in rst/sync/internal/workmgr/utils.go:L184 before proceeding to the workmgr changes rst/sync/internal/workmgr/manager.go

Checklist before merging:

When creating a PR these are items to keep in mind that cannot be checked by GitHub actions:

  • Documentation:
    • Does developer documentation (code comments, readme, etc.) need to be added or updated?
    • Does the user documentation need to be expanded or updated for this change?
  • Testing:
    • Does this functionality require changing or adding new unit tests?
    • Does this functionality require changing or adding new integration tests?
  • Git Hygiene:

For more details refer to the Go coding standards and the pull request process.

@swartzn swartzn requested a review from a team as a code owner June 25, 2025 21:12
@swartzn swartzn changed the title Swartzn/add priority wait queue Swartzn/add priority and wait queue Jun 26, 2025
@swartzn swartzn force-pushed the swartzn/add-priority-wait-queue branch 3 times, most recently from b7a0415 to 48ed876 Compare June 26, 2025 13:17
return "", fmt.Errorf("unable to generate next PK sequence: %w", err)
}
return fmt.Sprintf("%0*s", s.config.pkSeqWidth, strconv.FormatInt(int64(seq), s.config.pkSeqBase)), nil
return fmt.Sprintf("%0*s", s.config.pkSeqWidth, strconv.FormatUint(seq, s.config.pkSeqBase)), nil
Copy link
Contributor Author

@swartzn swartzn Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamjoemccormick I wasn't able to find any reason seq had to be an int64. This way when a key wraps it'll go back to zero instead of negative.

nextExpectedSubmissionID := ""
nextExpectedSubmissionID := startAtSubmissionID
if lastSubmissionID != "" {
lastSubmissionIDInt64, err := strconv.ParseInt(lastSubmissionID, 10, 64)
Copy link
Contributor Author

@swartzn swartzn Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing the submission id as base-10 resulted in nextExpectedSubmissionID being set to the beginning each time.

@swartzn swartzn linked an issue Jun 27, 2025 that may be closed by this pull request
sundereshwar
sundereshwar previously approved these changes Jul 4, 2025
@swartzn
Copy link
Contributor Author

swartzn commented Aug 4, 2025

@swartzn Squash commits before merging

Add priority option to ctl push/pull commands with five priority levels.
Add IsWorkRequestReady() method to the rst.Provider interface to validate work is ready.
* Before work is processed, IsWorkRequestReady() is called to determine if it should be added to the wait queue.

Fix invalid submission id parsing which resulted in each pullInNewWork() call to return "".
@swartzn swartzn force-pushed the swartzn/add-priority-wait-queue branch from 294dd70 to 3afcac2 Compare September 24, 2025 16:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add sync priority and wait queue
2 participants