-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve concurrency in TaskQueue #17828
base: master
Are you sure you want to change the base?
Conversation
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, feel free to merge after considering the comment about the name of the giant
lock.
* queue management is in progress. {@link #start} and {@link #stop} methods | ||
* acquire a WRITE lock whereas all other operations acquire a READ lock. | ||
*/ | ||
private final ReentrantReadWriteLock giant = new ReentrantReadWriteLock(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming to startStopLock
, since its main purpose now is to synchronize start
and stop
with each other and with the other operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks for the review.
@@ -280,18 +282,17 @@ public void start() | |||
@LifecycleStop | |||
public void stop() | |||
{ | |||
giant.lock(); | |||
giant.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good we we add tests triggering this behavior ie
one threads is hitting start/stop method while the other is trying to insert tasks to activeTasks
@@ -4297,7 +4297,7 @@ private Map<String, Task> getActiveTaskMap() | |||
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue(); | |||
final List<Task> tasks; | |||
if (taskQueue.isPresent()) { | |||
tasks = taskQueue.get().getActiveTasksForDatasource(dataSource); | |||
return taskQueue.get().getActiveTasksForDatasource(dataSource); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably remove line 4296 and 4298 and fold them inside the else at 4301. The resulting method would be much leaner.
Description
The
giant
lock inTaskQueue
is acquired when performing any update or read operation, such as:On large clusters with several concurrent tasks, this can become a bottleneck.
This patch attempts to improve the concurrency in
TaskQueue
by using a concurrent hash map instead.Changes
ConcurrentHashMap
to track active tasks. The giant lock was needed only to handlecompeting updates made to the same task ID. This can be handled by a
ConcurrentHashMap
too.ConcurrentHashMap.compute
to ensure atomicitygiant
lock from aReentrantLock
to aReentrantReadWriteLock
giant
lock to ensure that theTaskQueue
start
andstop
is mutually exclusive from any othernormal operation on the
TaskQueue
lastUpdatedTime
in everyTaskEntry
to handle race conditions when syncing from metadata storeHandling race conditions
The only possible race conditions are with
syncFromStorage()
,and are handled by maintaining a
lastUpdatedTime
for every task entry.All updates to a task entry happen inside a
ConcurrentHashMap.compute(taskId, entry -> update(entry))
and are thus thread-safe.
[A] Sync should not add a task to queue if it has just been removed
This is handled by ensuring that only the
syncFromStorage()
method can remove tasks from the queue.A task that completed after the poll started would have a
lastUpdatedTime
aftersyncStartTime
.In this case, since an entry already exists in the queue, a new one will not be added.
The next invocation of
syncFromStorage()
will add the task, if necessary.[B] Sync should not remove a task from queue if it has just been added
A task added after the poll started would have a
lastUpdatedTime
aftersyncStartTime
.In this case, the entry will not be removed.
The next invocation of
syncFromStorage()
will clean up the task, if necessary.Pending
This PR has: