Skip to content

Improve the queue performance by adding an explicit lock mechanism#129

Merged
jun-he merged 2 commits intomainfrom
jun/improve-queue
Sep 5, 2025
Merged

Improve the queue performance by adding an explicit lock mechanism#129
jun-he merged 2 commits intomainfrom
jun/improve-queue

Conversation

@jun-he
Copy link
Contributor

@jun-he jun-he commented Aug 5, 2025

Pull Request type

  • Bugfix
  • Feature
  • Refactoring (no functional changes, no api changes)
  • Build related changes (Please run ./gradlew build --write-locks to refresh dependencies)
  • Other (please describe): performance improvement

NOTE: Please remember to run ./gradlew spotlessApply to fix any format violations.

Changes in this PR

Improve the queue performance by adding an explicit lock mechanism because SKIP LOCKED does not perform well if there are a large number of rows.

@jun-he jun-he requested a review from Copilot August 5, 2025 20:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves queue performance by implementing an explicit database locking mechanism to replace the previous SKIP LOCKED approach when dealing with large numbers of rows. The changes introduce a new locking table structure and modify the dequeue operation to use explicit locks instead of relying solely on database-level row locking.

Key changes:

  • Added explicit lock mechanism using a special queue entry (queue_id=0) to control access to queue operations
  • Modified dequeue operation to check for locks before processing messages and removed SKIP LOCKED clause
  • Enhanced logging in SubworkflowStepRuntime for better debugging and error tracking

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
MaestroQueueDao.java Implements core locking mechanism with addLock method and updates dequeue logic to use explicit locks
MaestroQueueWorkerService.java Adds lock initialization during worker service startup
MaestroQueueDaoTest.java Updates tests to work with new locking mechanism and adds test coverage for addLock functionality
SubworkflowStepRuntime.java Improves logging with better error messages and workflow identity information

} else {
return Collections.emptyList();
}
}
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested try-with-resources blocks and conditional logic create complex control flow. Consider extracting the lock checking and message dequeuing logic into separate methods to improve readability and maintainability.

Copilot uses AI. Check for mistakes.
"INSERT INTO maestro_queue (queue_id,owned_until,msg_id,payload,create_time) "
+ "VALUES (0,0,?,'lock',EXTRACT(EPOCH FROM NOW())::INT8*1000) ON CONFLICT DO NOTHING";
private static final String LOCK_SCAN_QUERY =
"SELECT msg_id FROM maestro_queue WHERE queue_id=0 AND owned_until=0 AND msg_id=? FOR UPDATE SKIP LOCKED";
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic values '0' for queue_id and owned_until, and 'lock' for payload should be defined as named constants to improve code maintainability and prevent potential errors.

Suggested change
"SELECT msg_id FROM maestro_queue WHERE queue_id=0 AND owned_until=0 AND msg_id=? FOR UPDATE SKIP LOCKED";
+ "VALUES (" + LOCK_QUEUE_ID + "," + LOCK_OWNED_UNTIL + ",?,'" + LOCK_PAYLOAD + "',EXTRACT(EPOCH FROM NOW())::INT8*1000) ON CONFLICT DO NOTHING";
private static final String LOCK_SCAN_QUERY =
"SELECT msg_id FROM maestro_queue WHERE queue_id=" + LOCK_QUEUE_ID + " AND owned_until=" + LOCK_OWNED_UNTIL + " AND msg_id=? FOR UPDATE SKIP LOCKED";

Copilot uses AI. Check for mistakes.
dao.remove(new MessageDto(0, String.valueOf(queueId), null, 0));
assertEquals(1, dao.addLock(queueId));
assertEquals(0, dao.addLock(queueId));
dao.remove(new MessageDto(0, String.valueOf(queueId), null, 0));
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic value '0' for msgId in the test should be defined as a constant or use a more descriptive approach to create lock removal messages, as this pattern is repeated multiple times in the test.

Suggested change
dao.remove(new MessageDto(0, String.valueOf(queueId), null, 0));
dao.remove(new MessageDto(LOCK_REMOVAL_MSG_ID, String.valueOf(queueId), null, 0));
assertEquals(1, dao.addLock(queueId));
assertEquals(0, dao.addLock(queueId));
dao.remove(new MessageDto(LOCK_REMOVAL_MSG_ID, String.valueOf(queueId), null, 0));

Copilot uses AI. Check for mistakes.
res.getLong(++idx),
res.getString(++idx),
res.getString(++idx),
res.getLong(++idx)));
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'idx' is reset inside the while loop but was already declared and used outside the loop. Consider declaring a new variable within the loop scope to avoid confusion and potential bugs.

Copilot uses AI. Check for mistakes.
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
e);
// todo improve this error handling to gracefully clean up all resources
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO comment indicates incomplete error handling implementation. This should be tracked in an issue or implemented as part of this PR since it affects production code reliability.

Suggested change
// todo improve this error handling to gracefully clean up all resources
// Attempt to clean up resources associated with this subworkflow step runtime
try {
cleanupResources(workflowSummary, step, runtimeSummary);
} catch (Exception cleanupEx) {
LOG.error("Error during resource cleanup for subworkflow step runtime {}{}: {}", workflowSummary.getIdentity(), runtimeSummary.getIdentity(), cleanupEx);
}

Copilot uses AI. Check for mistakes.
…cause SKIP LOCKED does not perform well if there are a large number of rows.
@jun-he jun-he force-pushed the jun/improve-queue branch 2 times, most recently from 08cce35 to 0d2ea17 Compare August 28, 2025 22:05
@jun-he jun-he force-pushed the jun/improve-queue branch from 0d2ea17 to 91184d8 Compare August 28, 2025 22:08
@jun-he jun-he merged commit 0a42c31 into main Sep 5, 2025
1 check passed
@jun-he jun-he deleted the jun/improve-queue branch September 5, 2025 21:25
derek-miller pushed a commit to derek-miller/maestro that referenced this pull request Mar 4, 2026
* Use @Jacksonized annotation to reduce necessary jackson annotations on models

* Use caffeine cache for rule parsing

Also did some minor cleanup with constants and package structure.
derek-miller pushed a commit to derek-miller/maestro that referenced this pull request Mar 4, 2026
* add a SEL function to check if a param exists and improve the SEL doc (Netflix#118)

* fix some warnings from the build (Netflix#120)

* add get workflow instances batch endpoint (Netflix#121)

* Add a SEL function to get the value of error retries. Also add one example of k8s step to set env. (Netflix#123)

* Add a Redis-based InstanceStepConcurrencyHandler implementation. (Netflix#125)

* Add a Redis-based InstanceStepConcurrencyHandler implementation.

* address the comment

* Add postgres based Maestro persistence layer to support postgressql db. (Netflix#119)

* Add postgres based Maestro persistence layer to support postgressql db.

* Update indexed columns to use the COLLATE "C".

* Make this minor update to use JSON instead JSONB to address some tricky cases.
Note that Postgres JSONB will not preserve the original order of map field.
It might cause troubles in certain cases, e.g. param map order.

* update the dependency lock

* address the comments

* Improve Maestro to retry certain db errors, e.g. connection is closed. (Netflix#127)

* Improve Maestro to retry certain db errors, e.g. connection is closed.

* fix null pointer issue in the set contains call

* Update delete queue config to match the deletion processor timeout. (Netflix#130)

* Update delete queue config to match the deletion processor timeout.

* address the comment

* Explicitly clear the pending action after passing it to the step runtime. (Netflix#131)

* Explicitly clear the pending action after passing it to the step runtime.

* Add tests to verify that the pending action is reset.

* Add comments for the config.

* Improve the test a bit.

* Fix edge case where unblocking workflow doesn't enqueue job event (Netflix#132)

* Fix edge case where unblocking workflow doesn't enqueue job event

There is an edge case when we unblock all failed instances in batches and default batch size = 100, the only failed instance was unblocked in first while loop but didn't enqueue a job event because the condition requires unblocked count > 0 (in 1st loop this value is not updated and is still 0). This PR updated the if condition to account for this case.

* improve comment

* Check if string map param is literal before returning the value. (Netflix#133)

* Check if string map param is literal before returning the value.

* Add additional endpoints to workflow controller. (Netflix#136)

* Check if map param is literal before returning the value. (Netflix#140)

* Check if map param is literal before returning the value.

* address the comment

* Maestro workers should not retry in certain cases (Netflix#141)

* While a worker processes a job, the business logic might throw MaestroNotFoundException, e.g. the workflow is deleted. The worker should not retry in this case.

* address the comment

* fix a corner case during launching a subworkflow if the subworkflow instance already exists (Netflix#142)

* getAction logic should not pick up the upstream actions in async execution mode (Netflix#143)

* Subworkflow should wake up its child workflow than itself. (Netflix#144)

* Improve step runtime to support setting the next polling delay. (Netflix#145)

* Invalid job event should be removed from the queue. (Netflix#146)

* Invalid job event should be removed from the queue.

* address the comment

* Improve the queue performance by adding an explicit lock mechanism (Netflix#129)

* Improve the queue performance by adding an explicit lock mechanism because SKIP LOCKED does not perform well if there are a large number of rows.

* Use a dedicated queue_lock table for locking queue id.

* update dependencies field to JSONB so to support first_value() (Netflix#148)

* Add while step runtime support (Netflix#147)

* add while step runtime to support while loop

* address the comments

* fix batch loading rollup

* Support waking up flow engine with an action code. (Netflix#149)

* Support waking up flow engine with an action code.
Meaning of action codes are determined by the Task implementation.

* address the comment

* deduplicate actions to avoid unnecessary executions in the flow engine (Netflix#150)

* Implement tag permit support for Maestro. (Netflix#151)

* Implement tag permit support for Maestro.

* add extra metrics and small improvements related to tag permit feature (Netflix#152)

* Jun/unblock event (Netflix#153)

* fix missing action and message fields in the unblock event

* upgrade dependency locks

* Fix an issue for workflow timeout action: when the workflow is timed out, both the workflow instance and its steps should be marked as timeout.

* Slightly improve the code readability with additional comments. (Netflix#155)

* Fix the http status code in MaestroRuntimeException. (Netflix#156)

* Improve the code and a unit test (Netflix#157)

* calling Array.free() to release resources in loop cases (Netflix#159)

* Improve actors to cancel the scheduled task ping action during activate/wakeup (Netflix#160)

* Actor improvement: during activate call, cancel any existing scheduled task ping as activate will schedule a new ping.

* Improve the code a bit and add extra unit tests.

* fix tag permit release for queued cases (Netflix#161)

* Support extension function in SEL Util class. (Netflix#162)

* Support extension function in SEL Util class.
* Also add a toJson extension function to Maestro SEL expression evaluator.
* Address the comments.

* Rename testcontainerPostgresDep to testcontainerDep in all build.gradle files

To match the renamed dependency in dependencies.gradle after OSS merge

* Regenerate all gradle lockfiles after OSS merge

Generated by running: ./gradlew build --write-locks -x test -x integrationTest

* Add db-type and ownership-timeout properties to production config

- Add db-type: postgres to engine.configs (required by OSS)
- Add ownership-timeout: 125000 to queue 5 for deletion processor
- Tag permit features left disabled (not needed for Airbnb deployment)

* Add OSS sync script for merging upstream changes

- Prompts for branch name to avoid hardcoding usernames
- Fetches latest from origin and oss remotes
- Creates branch from origin/main and merges oss/master
- Lists conflicts if any, with helpful next steps
- Auto-commits if no conflicts detected

---------

Co-authored-by: jun-he <jun-he@users.noreply.github.com>
Co-authored-by: yingyiz-netflix <yingyiz@netflix.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants