Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Maintain TaskLockbox at datasource level for higher concurrency #17390

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class SegmentAllocationQueue

private final long maxWaitTimeMillis;

private final TaskLockbox taskLockbox;
private final GlobalTaskLockbox taskLockbox;
private final IndexerMetadataStorageCoordinator metadataStorage;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final ServiceEmitter emitter;
Expand All @@ -91,7 +91,7 @@ public class SegmentAllocationQueue

@Inject
public SegmentAllocationQueue(
TaskLockbox taskLockbox,
GlobalTaskLockbox taskLockbox,
TaskLockConfig taskLockConfig,
IndexerMetadataStorageCoordinator metadataStorage,
ServiceEmitter emitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskStorage;
Expand All @@ -33,7 +33,7 @@

public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final GlobalTaskLockbox taskLockbox;
private final TaskStorage taskStorage;
private final SegmentAllocationQueue segmentAllocationQueue;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
Expand All @@ -44,7 +44,7 @@ public class TaskActionToolbox

@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
GlobalTaskLockbox taskLockbox,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
SegmentAllocationQueue segmentAllocationQueue,
Expand All @@ -63,7 +63,7 @@ public TaskActionToolbox(
}

public TaskActionToolbox(
TaskLockbox taskLockbox,
GlobalTaskLockbox taskLockbox,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter,
Expand All @@ -82,7 +82,7 @@ public TaskActionToolbox(
);
}

public TaskLockbox getTaskLockbox()
public GlobalTaskLockbox getTaskLockbox()
{
return taskLockbox;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.metadata.ReplaceTaskLock;
Expand All @@ -52,7 +52,7 @@ public class TaskLocks
{
static void checkLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final GlobalTaskLockbox taskLockbox,
final Collection<DataSegment> segments
)
{
Expand All @@ -69,7 +69,7 @@ static void checkLockCoversSegments(
@VisibleForTesting
static boolean isLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final GlobalTaskLockbox taskLockbox,
final Collection<DataSegment> segments
)
{
Expand Down Expand Up @@ -176,7 +176,7 @@ public static TaskLockType determineLockTypeForAppend(
*/
public static Map<DataSegment, ReplaceTaskLock> findReplaceLocksCoveringSegments(
final String datasource,
final TaskLockbox taskLockbox,
final GlobalTaskLockbox taskLockbox,
final Set<DataSegment> segments
)
{
Expand Down Expand Up @@ -206,7 +206,7 @@ public static Map<DataSegment, ReplaceTaskLock> findReplaceLocksCoveringSegments

public static List<TaskLock> findLocksForSegments(
final Task task,
final TaskLockbox taskLockbox,
final GlobalTaskLockbox taskLockbox,
final Collection<DataSegment> segments
)
{
Expand Down Expand Up @@ -245,7 +245,7 @@ public static List<TaskLock> findLocksForSegments(
return found;
}

private static NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskLockbox taskLockbox, Task task)
private static NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(GlobalTaskLockbox taskLockbox, Task task)
{
final List<TaskLock> taskLocks = taskLockbox.findLocksForTask(task);
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
* This class represents a critical action must be done while the task's lock is guaranteed to not be revoked in the
* middle of the action.
*
* Implementations must not change the lock state by calling {@link TaskLockbox#lock)}, {@link TaskLockbox#tryLock)},
* or {@link TaskLockbox#unlock(Task, Interval)}.
* Implementations must not change the lock state by calling {@link GlobalTaskLockbox#lock)}, {@link GlobalTaskLockbox#tryLock)},
* or {@link GlobalTaskLockbox#unlock(Task, Interval)}.
*
* Also, implementations should be finished as soon as possible because all methods in {@link TaskLockbox} are blocked
* Also, implementations should be finished as soon as possible because all methods in {@link GlobalTaskLockbox} are blocked
* until this action is finished.
*
* @see TaskLockbox#doInCriticalSection
* @see GlobalTaskLockbox#doInCriticalSection
*/
public class CriticalAction<T>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DruidOverlord(
final TaskLockConfig taskLockConfig,
final TaskQueueConfig taskQueueConfig,
final DefaultTaskConfig defaultTaskConfig,
final TaskLockbox taskLockbox,
final GlobalTaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode selfNode,
Expand Down
Loading
Loading