Skip to content

Commit

Permalink
Add ability to limit the number of segments killed in kill task (#14662)
Browse files Browse the repository at this point in the history
### Description

Previously, the `maxSegments` configured for auto kill could be ignored if an interval of data for a  given datasource had more than this number of unused segments, causing the kill task spawned with the task of deleting unused segments in that given interval of data to delete more than the `maxSegments` configured. Now each kill task spawned by the auto kill coordinator duty, will kill at most `limit` segments. This is done by adding a new config property to the `KillUnusedSegmentTask` which allows users to specify this limit.
  • Loading branch information
zachjsh authored Aug 4, 2023
1 parent 7488744 commit ba957a9
Show file tree
Hide file tree
Showing 25 changed files with 526 additions and 74 deletions.
9 changes: 6 additions & 3 deletions docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,18 @@ The available grammar is:
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
"batchSize": <optional_batch size>
"batchSize": <optional_batch size>,
"limit": <the maximum number of segments to delete>
}
```

Some of the parameters used in the task payload are further explained below:

| Parameter |Default| Explanation |
|--------------|-------|--------------------------------------------------------------------------------------------------------|
| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
| `limit` | null - no limit | Maximum number of segments for the kill task to delete.|


**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;

import java.util.List;

public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment>>
Expand All @@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
@JsonIgnore
private final Interval interval;

@JsonIgnore
private final Integer limit;

@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("limit") @Nullable Integer limit
)
{
this.dataSource = dataSource;
this.interval = interval;
this.limit = limit;
}

@JsonProperty
Expand All @@ -59,6 +66,13 @@ public Interval getInterval()
return interval;
}

@Nullable
@JsonProperty
public Integer getLimit()
{
return limit;
}

@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
Expand All @@ -68,7 +82,8 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
}

@Override
Expand All @@ -83,6 +98,7 @@ public String toString()
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", limit=" + limit +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
Expand All @@ -37,13 +37,16 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -61,6 +64,7 @@
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
public static final String TYPE = "kill";
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

/**
Expand All @@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

private final boolean markAsUnused;

/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;
@Nullable private final Integer limit;


// counter included primarily for testing
// counters included primarily for testing
private int numSegmentsKilled = 0;
private long numBatchesProcessed = 0;

@JsonCreator
Expand All @@ -90,8 +96,9 @@ public KillUnusedSegmentsTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit
)
{
super(
Expand All @@ -103,6 +110,19 @@ public KillUnusedSegmentsTask(
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero");
if (null != limit && limit <= 0) {
throw InvalidInput.exception(
"limit [%d] is invalid. It must be a positive integer.",
limit
);
}
if (limit != null && markAsUnused != null && markAsUnused) {
throw InvalidInput.exception(
"limit cannot be provided with markAsUnused.",
limit
);
}
this.limit = limit;
}

@JsonProperty
Expand All @@ -119,10 +139,17 @@ public int getBatchSize()
return batchSize;
}

@Nullable
@JsonProperty
public Integer getLimit()
{
return limit;
}

@Override
public String getType()
{
return "kill";
return TYPE;
}

@Nonnull
Expand All @@ -140,6 +167,13 @@ long getNumBatchesProcessed()
return numBatchesProcessed;
}

@JsonIgnore
@VisibleForTesting
long getNumSegmentsKilled()
{
return numSegmentsKilled;
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -153,27 +187,29 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize);

// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize);
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
batchSize,
limit,
numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "."
);
do {
if (nextBatchSize <= 0) {
break;
}
unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

Expand All @@ -186,19 +222,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
numSegmentsKilled += unusedSegments.size();

if (numBatchesProcessed % 10 == 0) {
LOG.info("Processed [%d/%d] batches for kill task[%s].",
numBatchesProcessed, unusedSegmentBatches.size(), getId());
}
}
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId());

LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size());
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches));

LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments "
+ "in [%d] batches.",
getId(),
getDataSource(),
getInterval(),
numSegmentsKilled,
numBatchesProcessed
);

return TaskStatus.success(getId());
}

@JsonIgnore
@VisibleForTesting
@Nullable
Integer getNumTotalBatches()
{
return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
}

@JsonIgnore
@VisibleForTesting
int computeNextBatchSize(int numSegmentsKilled)
{
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;
}

private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "kill", value = KillUnusedSegmentsTask.class),
@Type(name = KillUnusedSegmentsTask.TYPE, value = KillUnusedSegmentsTask.class),
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testRetrieveUsedSegmentsAction()
@Test
public void testRetrieveUnusedSegmentsAction()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
99
false,
99,
5
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Expand All @@ -61,6 +62,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());

}

@Test
Expand All @@ -71,6 +74,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
Expand All @@ -80,6 +84,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
}

@Test
Expand All @@ -91,7 +96,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Intervals.of("2020-01-01/P1D"),
null,
true,
99
99,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -103,5 +109,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
}
}
Loading

0 comments on commit ba957a9

Please sign in to comment.