Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split KillUnusedSegmentsTask to processing in smaller chunks #14642

Merged
merged 10 commits into from
Jul 31, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -60,6 +61,10 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

// We split this to try and keep each nuke operation relatively short, in the case that either
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
// the database or the storage layer is particularly slow.
private static final int SEGMENT_NUKE_BATCH_SIZE = 10_000;
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved

private final boolean markAsUnused;

@JsonCreator
Expand Down Expand Up @@ -114,23 +119,37 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, SEGMENT_NUKE_BATCH_SIZE);

jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

// Kill segments:
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
}

// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);

return TaskStatus.success(getId());
}

Expand Down
Loading