Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve concurrency in TaskQueue #17828

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Mar 23, 2025

Description

The giant lock in TaskQueue is acquired when performing any update or read operation, such as:

  • add a task
  • remove a task
  • read the list of tasks
  • sync from storage

On large clusters with several concurrent tasks, this can become a bottleneck.

This patch attempts to improve the concurrency in TaskQueue by using a concurrent hash map instead.

Changes

  • Use a ConcurrentHashMap to track active tasks. The giant lock was needed only to handle
    competing updates made to the same task ID. This can be handled by a ConcurrentHashMap too.
  • Perform any update on the entry for a task ID within ConcurrentHashMap.compute to ensure atomicity
  • Convert the giant lock from a ReentrantLock to a ReentrantReadWriteLock
  • Repurpose giant lock to ensure that the TaskQueue start and stop is mutually exclusive from any other
    normal operation on the TaskQueue
  • Keep a lastUpdatedTime in every TaskEntry to handle race conditions when syncing from metadata store

Handling race conditions

The only possible race conditions are with syncFromStorage(),
and are handled by maintaining a lastUpdatedTime for every task entry.

All updates to a task entry happen inside a ConcurrentHashMap.compute(taskId, entry -> update(entry))
and are thus thread-safe.

[A] Sync should not add a task to queue if it has just been removed

This is handled by ensuring that only the syncFromStorage() method can remove tasks from the queue.
A task that completed after the poll started would have a lastUpdatedTime after syncStartTime.
In this case, since an entry already exists in the queue, a new one will not be added.
The next invocation of syncFromStorage() will add the task, if necessary.

[B] Sync should not remove a task from queue if it has just been added

A task added after the poll started would have a lastUpdatedTime after syncStartTime.
In this case, the entry will not be removed.
The next invocation of syncFromStorage() will clean up the task, if necessary.

Pending

  • Unit tests for concurrency
  • Cluster testing

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, feel free to merge after considering the comment about the name of the giant lock.

* queue management is in progress. {@link #start} and {@link #stop} methods
* acquire a WRITE lock whereas all other operations acquire a READ lock.
*/
private final ReentrantReadWriteLock giant = new ReentrantReadWriteLock(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming to startStopLock, since its main purpose now is to synchronize start and stop with each other and with the other operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks for the review.

@@ -280,18 +282,17 @@ public void start()
@LifecycleStop
public void stop()
{
giant.lock();
giant.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good we we add tests triggering this behavior ie
one threads is hitting start/stop method while the other is trying to insert tasks to activeTasks

@@ -4297,7 +4297,7 @@ private Map<String, Task> getActiveTaskMap()
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
final List<Task> tasks;
if (taskQueue.isPresent()) {
tasks = taskQueue.get().getActiveTasksForDatasource(dataSource);
return taskQueue.get().getActiveTasksForDatasource(dataSource);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably remove line 4296 and 4298 and fold them inside the else at 4301. The resulting method would be much leaner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants