From ba957a9b97b6bdbecb166db771721cced1dac365 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 3 Aug 2023 22:17:04 -0400 Subject: [PATCH] Add ability to limit the number of segments killed in kill task (#14662) ### Description Previously, the `maxSegments` configured for auto kill could be ignored if an interval of data for a given datasource had more than this number of unused segments, causing the kill task spawned with the task of deleting unused segments in that given interval of data to delete more than the `maxSegments` configured. Now each kill task spawned by the auto kill coordinator duty, will kill at most `limit` segments. This is done by adding a new config property to the `KillUnusedSegmentTask` which allows users to specify this limit. --- docs/data-management/delete.md | 9 +- .../actions/RetrieveUnusedSegmentsAction.java | 20 ++- .../indexing/common/task/ArchiveTask.java | 2 +- .../common/task/KillUnusedSegmentsTask.java | 119 +++++++++++---- .../druid/indexing/common/task/MoveTask.java | 2 +- .../indexing/common/task/RestoreTask.java | 2 +- .../druid/indexing/common/task/Task.java | 2 +- .../actions/RetrieveSegmentsActionsTest.java | 2 +- ...tKillUnusedSegmentsTaskQuerySerdeTest.java | 13 +- .../task/KillUnusedSegmentsTaskTest.java | 144 +++++++++++++++++- .../indexing/overlord/TaskLifecycleTest.java | 101 +++++++++++- ...TestIndexerMetadataStorageCoordinator.java | 17 +++ .../ClientKillUnusedSegmentsTaskQuery.java | 23 ++- .../IndexerMetadataStorageCoordinator.java | 19 ++- .../IndexerSQLMetadataStorageCoordinator.java | 12 +- .../metadata/SqlSegmentsMetadataManager.java | 2 +- .../metadata/SqlSegmentsMetadataQuery.java | 17 ++- .../druid/rpc/indexing/OverlordClient.java | 33 +++- .../coordinator/duty/KillUnusedSegments.java | 10 +- .../server/http/DataSourcesResource.java | 2 +- ...ClientKillUnusedSegmentsTaskQueryTest.java | 18 ++- ...exerSQLMetadataStorageCoordinatorTest.java | 16 ++ .../rpc/indexing/OverlordClientImplTest.java | 9 +- .../duty/KillUnusedSegmentsTest.java | 4 +- .../server/http/DataSourcesResourceTest.java | 2 +- 25 files changed, 526 insertions(+), 74 deletions(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 260a66a17498..9e59c751bc2d 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -96,15 +96,18 @@ The available grammar is: "dataSource": , "interval" : , "context": , - "batchSize": + "batchSize": , + "limit": } ``` Some of the parameters used in the task payload are further explained below: -| Parameter |Default| Explanation | -|--------------|-------|--------------------------------------------------------------------------------------------------------| +| Parameter | Default | Explanation | +|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| +| `limit` | null - no limit | Maximum number of segments for the kill task to delete.| + **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java index f114ec456465..150648858c15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java @@ -27,6 +27,8 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.List; public class RetrieveUnusedSegmentsAction implements TaskAction> @@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements TaskAction> getReturnTypeReference() { @@ -68,7 +82,8 @@ public TypeReference> getReturnTypeReference() @Override public List perform(Task task, TaskActionToolbox toolbox) { - return toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource, interval); + return toolbox.getIndexerMetadataStorageCoordinator() + .retrieveUnusedSegmentsForInterval(dataSource, interval, limit); } @Override @@ -83,6 +98,7 @@ public String toString() return getClass().getSimpleName() + "{" + "dataSource='" + dataSource + '\'' + ", interval=" + interval + + ", limit=" + limit + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java index 904181a2431c..42d04316a41f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 0d54ae96b05d..35653aa2303f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -26,8 +26,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskToolbox; @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -44,6 +45,8 @@ import org.joda.time.Interval; import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -61,6 +64,7 @@ */ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { + public static final String TYPE = "kill"; private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); /** @@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private final boolean markAsUnused; - /** * Split processing to try and keep each nuke operation relatively short, in the case that either * the database or the storage layer is particularly slow. */ private final int batchSize; + @Nullable private final Integer limit; + - // counter included primarily for testing + // counters included primarily for testing + private int numSegmentsKilled = 0; private long numBatchesProcessed = 0; @JsonCreator @@ -90,8 +96,9 @@ public KillUnusedSegmentsTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("context") Map context, - @JsonProperty("markAsUnused") Boolean markAsUnused, - @JsonProperty("batchSize") Integer batchSize + @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize, + @JsonProperty("limit") @Nullable Integer limit ) { super( @@ -103,6 +110,19 @@ public KillUnusedSegmentsTask( this.markAsUnused = markAsUnused != null && markAsUnused; this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); + if (null != limit && limit <= 0) { + throw InvalidInput.exception( + "limit [%d] is invalid. It must be a positive integer.", + limit + ); + } + if (limit != null && markAsUnused != null && markAsUnused) { + throw InvalidInput.exception( + "limit cannot be provided with markAsUnused.", + limit + ); + } + this.limit = limit; } @JsonProperty @@ -119,10 +139,17 @@ public int getBatchSize() return batchSize; } + @Nullable + @JsonProperty + public Integer getLimit() + { + return limit; + } + @Override public String getType() { - return "kill"; + return TYPE; } @Nonnull @@ -140,6 +167,13 @@ long getNumBatchesProcessed() return numBatchesProcessed; } + @JsonIgnore + @VisibleForTesting + long getNumSegmentsKilled() + { + return numSegmentsKilled; + } + @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { @@ -153,27 +187,29 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } // List unused segments - final List allUnusedSegments = toolbox - .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); - - final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize); - - // The individual activities here on the toolbox have possibility to run for a longer period of time, - // since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the - // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the - // lock to other activity that might need to happen using the overlord tasklockbox. - - LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).", - getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); + int nextBatchSize = computeNextBatchSize(numSegmentsKilled); + @Nullable Integer numTotalBatches = getNumTotalBatches(); + List unusedSegments; + LOG.info( + "Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s", + batchSize, + limit, + numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "." + ); + do { + if (nextBatchSize <= 0) { + break; + } + unusedSegments = toolbox + .getTaskActionClient() + .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); - for (final List unusedSegments : unusedSegmentBatches) { if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { throw new ISE( - "Locks[%s] for task[%s] can't cover segments[%s]", - taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), - getId(), - unusedSegments + "Locks[%s] for task[%s] can't cover segments[%s]", + taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), + getId(), + unusedSegments ); } @@ -186,19 +222,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); toolbox.getDataSegmentKiller().kill(unusedSegments); numBatchesProcessed++; + numSegmentsKilled += unusedSegments.size(); - if (numBatchesProcessed % 10 == 0) { - LOG.info("Processed [%d/%d] batches for kill task[%s].", - numBatchesProcessed, unusedSegmentBatches.size(), getId()); - } - } + LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); - LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.", - getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); + nextBatchSize = computeNextBatchSize(numSegmentsKilled); + } while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); + + LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments " + + "in [%d] batches.", + getId(), + getDataSource(), + getInterval(), + numSegmentsKilled, + numBatchesProcessed + ); return TaskStatus.success(getId()); } + @JsonIgnore + @VisibleForTesting + @Nullable + Integer getNumTotalBatches() + { + return null != limit ? (int) Math.ceil((double) limit / batchSize) : null; + } + + @JsonIgnore + @VisibleForTesting + int computeNextBatchSize(int numSegmentsKilled) + { + return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize; + } + private NavigableMap> getTaskLockMap(TaskActionClient client) throws IOException { final NavigableMap> taskLockMap = new TreeMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java index 3e8b792eb885..d23b3820db74 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java index 5e91ad3375aa..1364bcb597fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5c3186ef3923..58a2ad435b0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -61,7 +61,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @Type(name = "kill", value = KillUnusedSegmentsTask.class), + @Type(name = KillUnusedSegmentsTask.TYPE, value = KillUnusedSegmentsTask.class), @Type(name = "move", value = MoveTask.class), @Type(name = "archive", value = ArchiveTask.class), @Type(name = "restore", value = RestoreTask.class), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 6149208fc391..2d360dfeb521 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -104,7 +104,7 @@ public void testRetrieveUsedSegmentsAction() @Test public void testRetrieveUnusedSegmentsAction() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUnusedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index 70cd5fcf19b9..3ab6bae4688f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -51,8 +51,9 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), - true, - 99 + false, + 99, + 5 ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); @@ -61,6 +62,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); + Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit()); + } @Test @@ -71,6 +74,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault "datasource", Intervals.of("2020-01-01/P1D"), true, + null, null ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); @@ -80,6 +84,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(100, fromJson.getBatchSize()); + Assert.assertNull(taskQuery.getLimit()); } @Test @@ -91,7 +96,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Intervals.of("2020-01-01/P1D"), null, true, - 99 + 99, + null ); final byte[] json = objectMapper.writeValueAsBytes(task); final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( @@ -103,5 +109,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); + Assert.assertNull(task.getLimit()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index f57624ed7ddf..3a89c19a785a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -81,6 +81,7 @@ public void testKill() throws Exception Intervals.of("2019-03-01/2019-04-01"), null, false, + null, null ); @@ -97,7 +98,9 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getNumBatchesProcessed()); + + Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(1, task.getNumSegmentsKilled()); } @@ -128,6 +131,7 @@ public void testKillWithMarkUnused() throws Exception Intervals.of("2019-03-01/2019-04-01"), null, true, + null, null ); @@ -144,7 +148,8 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getNumBatchesProcessed()); + Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(1, task.getNumSegmentsKilled()); } @Test @@ -157,13 +162,14 @@ public void testGetInputSourceResources() Intervals.of("2019-03-01/2019-04-01"), null, true, + null, null ); Assert.assertTrue(task.getInputSourceResources().isEmpty()); } @Test - public void testKillBatchSizeOne() throws Exception + public void testKillBatchSizeOneAndLimit4() throws Exception { final String version = DateTimes.nowUtc().toString(); final Set segments = ImmutableSet.of( @@ -176,14 +182,23 @@ public void testKillBatchSizeOne() throws Exception Assert.assertEquals(segments, announced); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01") + ) + ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( null, DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, - true, - 1 + false, + 1, + 4 ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -195,6 +210,7 @@ public void testKillBatchSizeOne() throws Exception Assert.assertEquals(Collections.emptyList(), unusedSegments); Assert.assertEquals(4L, task.getNumBatchesProcessed()); + Assert.assertEquals(4, task.getNumSegmentsKilled()); } @Test @@ -218,7 +234,8 @@ public void testKillBatchSizeThree() throws Exception Intervals.of("2018-01-01/2020-01-01"), null, true, - 3 + 3, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -229,7 +246,120 @@ public void testKillBatchSizeThree() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(3L, task.getNumBatchesProcessed()); + Assert.assertEquals(4, task.getNumSegmentsKilled()); + } + + @Test + public void testComputeNextBatchSizeDefault() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + null, + null + ); + Assert.assertEquals(100, task.computeNextBatchSize(50)); + } + + @Test + public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 10, + 5 + ); + Assert.assertEquals(5, task.computeNextBatchSize(0)); + } + + @Test + public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(5, task.computeNextBatchSize(0)); + } + + @Test + public void testComputeNextBatchSizeWithRemainingLessThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(3, task.computeNextBatchSize(7)); + } + + @Test + public void testGetNumTotalBatchesDefault() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + null, + null + ); + Assert.assertNull(task.getNumTotalBatches()); + } + + @Test + public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 10, + 5 + ); + Assert.assertEquals(1, (int) task.getNumTotalBatches()); + } + + @Test + public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(2, (int) task.getNumTotalBatches()); } private static DataSegment newSegment(Interval interval, String version) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 2ea7fcedf193..31ec645b3fdd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -949,6 +949,7 @@ public DataSegment apply(String input) Intervals.of("2011-04-01/P4D"), null, false, + null, null ); @@ -957,7 +958,7 @@ public DataSegment apply(String input) Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size()); - Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount()); + Assert.assertEquals("delete segment batch call count", 2, mdc.getDeleteSegmentsCount()); Assert.assertTrue( "expected unused segments get killed", expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll( @@ -970,6 +971,104 @@ public DataSegment apply(String input) } } + @Test + public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws Exception + { + final File tmpSegmentDir = temporaryFolder.newFolder(); + + List expectedUnusedSegments = Lists.transform( + ImmutableList.of( + "2011-04-01/2011-04-02", + "2011-04-02/2011-04-03", + "2011-04-04/2011-04-05" + ), new Function() + { + @Override + public DataSegment apply(String input) + { + final Interval interval = Intervals.of(input); + try { + return DataSegment.builder() + .dataSource("test_kill_task") + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + tmpSegmentDir.getCanonicalPath() + + "/druid/localStorage/wikipedia/" + + interval.getStart() + + "-" + + interval.getEnd() + + "/" + + "2011-04-6T16:52:46.119-05:00" + + "/0/index.zip" + ) + ) + .version("2011-04-6T16:52:46.119-05:00") + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(NoneShardSpec.instance()) + .binaryVersion(9) + .size(0) + .build(); + } + catch (IOException e) { + throw new ISE(e, "Error creating segments"); + } + } + } + ); + + mdc.setUnusedSegments(expectedUnusedSegments); + + // manually create local segments files + List segmentFiles = new ArrayList<>(); + for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) { + File file = new File((String) segment.getLoadSpec().get("path")); + FileUtils.mkdirp(file.getParentFile()); + Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY); + segmentFiles.add(file); + } + + final int maxSegmentsToKill = 2; + final Task killUnusedSegmentsTask = + new KillUnusedSegmentsTask( + null, + "test_kill_task", + Intervals.of("2011-04-01/P4D"), + null, + false, + null, + maxSegmentsToKill + ); + + final TaskStatus status = runTask(killUnusedSegmentsTask); + Assert.assertEquals(taskLocation, status.getLocation()); + Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", maxSegmentsToKill, mdc.getNuked().size()); + Assert.assertTrue( + "expected unused segments get killed", + expectedUnusedSegments.containsAll(mdc.getNuked()) + ); + + int expectedNumOfSegmentsRemaining = segmentFiles.size() - maxSegmentsToKill; + int actualNumOfSegmentsRemaining = 0; + for (File file : segmentFiles) { + if (file.exists()) { + actualNumOfSegmentsRemaining++; + } + } + + Assert.assertEquals( + "Expected of segments deleted did not match expectations", + expectedNumOfSegmentsRemaining, + actualNumOfSegmentsRemaining + ); + } + @Test public void testRealtimeishTask() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index c8bf8fd28ab1..66af0a973043 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { @@ -110,6 +111,22 @@ public List retrieveUnusedSegmentsForInterval(String dataSource, In } } + @Override + public List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval, @Nullable Integer limit) + { + synchronized (unusedSegments) { + Stream resultStream = unusedSegments.stream(); + + resultStream = resultStream.filter(ds -> !nuked.contains(ds)); + + if (limit != null) { + resultStream = resultStream.limit(limit); + } + + return ImmutableList.copyOf(resultStream.iterator()); + } + } + @Override public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index 779c8acc7f28..3676d684097f 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.Objects; /** @@ -40,14 +42,16 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final Interval interval; private final Boolean markAsUnused; private final Integer batchSize; + @Nullable private final Integer limit; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, - @JsonProperty("markAsUnused") Boolean markAsUnused, - @JsonProperty("batchSize") Integer batchSize + @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize, + @JsonProperty("limit") Integer limit ) { this.id = Preconditions.checkNotNull(id, "id"); @@ -55,6 +59,8 @@ public ClientKillUnusedSegmentsTaskQuery( this.interval = interval; this.markAsUnused = markAsUnused; this.batchSize = batchSize; + Preconditions.checkArgument(limit == null || limit > 0, "limit must be > 0"); + this.limit = limit; } @JsonProperty @@ -96,6 +102,14 @@ public Integer getBatchSize() return batchSize; } + @JsonProperty + @Nullable + public Integer getLimit() + { + return limit; + } + + @Override public boolean equals(Object o) { @@ -110,12 +124,13 @@ public boolean equals(Object o) && Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval) && Objects.equals(markAsUnused, that.markAsUnused) - && Objects.equals(batchSize, that.batchSize); + && Objects.equals(batchSize, that.batchSize) + && Objects.equals(limit, that.limit); } @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused, batchSize); + return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, limit); } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index b3c70f0cdbe9..3d8f4b858657 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -123,17 +123,30 @@ Collection retrieveUsedSegmentsForIntervals( Segments visibility ); + /** + * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)} + */ + default List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null); + } + /** * Retrieve all published segments which include ONLY data within the given interval and are marked as unused from the * metadata store. * - * @param dataSource The data source the segments belong to - * @param interval Filter the data segments to ones that include data in this interval exclusively. + * @param dataSource The data source the segments belong to + * @param interval Filter the data segments to ones that include data in this interval exclusively. + * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * * @return DataSegments which include ONLY data within the requested interval and are marked as unused. Segments NOT * returned here may include data in the interval */ - List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval); + List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit + ); /** * Mark as unused segments which include ONLY data within the given interval. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 107dc4b9f97a..6c4d523133a7 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -190,12 +190,22 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin @Override public List retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null); + } + + @Override + public List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit + ) { final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval))) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index d98c5d4d0861..46db26a56d6c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -574,7 +574,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 13460e2695d0..45896a865ef9 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -37,6 +37,8 @@ import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -104,7 +106,7 @@ public CloseableIterator retrieveUsedSegments( final Collection intervals ) { - return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true); + return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null); } /** @@ -118,10 +120,11 @@ public CloseableIterator retrieveUsedSegments( */ public CloseableIterator retrieveUnusedSegments( final String dataSource, - final Collection intervals + final Collection intervals, + @Nullable final Integer limit ) { - return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false); + return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit); } /** @@ -201,7 +204,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( Iterators.transform( - retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true), + retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null), DataSegment::getId ) ); @@ -213,7 +216,8 @@ private CloseableIterator retrieveSegments( final String dataSource, final Collection intervals, final IntervalMode matchMode, - final boolean used + final boolean used, + @Nullable final Integer limit ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. @@ -259,6 +263,9 @@ private CloseableIterator retrieveSegments( .setFetchSize(connector.getStreamingFetchSize()) .bind("used", used) .bind("dataSource", dataSource); + if (null != limit) { + sql.setMaxRows(limit); + } if (compareAsString) { final Iterator iterator = intervals.iterator(); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 51b4323a11f4..b4391dc0a8ef 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -84,9 +84,40 @@ public interface OverlordClient * @return future with task ID */ default ListenableFuture runKillTask(String idPrefix, String dataSource, Interval interval) + { + return runKillTask(idPrefix, dataSource, interval, null); + } + + /** + * Run a "kill" task for a particular datasource and interval. Shortcut to {@link #runTask(String, Object)}. + * + * The kill task deletes all unused segment records from deep storage and the metadata store. The task runs + * asynchronously after the API call returns. The resolved future is the ID of the task, which can be used to + * monitor its progress through the {@link #taskStatus(String)} API. + * + * @param idPrefix Descriptive prefix to include at the start of task IDs + * @param dataSource Datasource to kill + * @param interval Interval to kill + * @param maxSegmentsToKill The maximum number of segments to kill + * + * @return future with task ID + */ + default ListenableFuture runKillTask( + String idPrefix, + String dataSource, + Interval interval, + @Nullable Integer maxSegmentsToKill + ) { final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); - final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null); + final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery( + taskId, + dataSource, + interval, + false, + null, + maxSegmentsToKill + ); return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> taskId); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index f4b6240b43a6..205947b00392 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -33,6 +33,8 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.List; @@ -130,7 +132,12 @@ private void killUnusedSegments(Collection dataSourcesToKill) } try { - FutureUtils.getUnchecked(overlordClient.runKillTask("coordinator-issued", dataSource, intervalToKill), true); + FutureUtils.getUnchecked(overlordClient.runKillTask( + "coordinator-issued", + dataSource, + intervalToKill, + maxSegmentsToKill + ), true); ++submittedTasks; } catch (Exception ex) { @@ -148,6 +155,7 @@ private void killUnusedSegments(Collection dataSourcesToKill) /** * Calculates the interval for which segments are to be killed in a datasource. */ + @Nullable private Interval findIntervalForKill(String dataSource) { final DateTime maxEndTime = ignoreRetainDuration diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 4a00f61300c9..1e146736da13 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -342,7 +342,7 @@ public Response killUnusedSegmentsInInterval( } final Interval theInterval = Intervals.of(interval.replace('_', '/')); try { - FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", dataSourceName, theInterval), true); + FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null), true); return Response.ok().build(); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index af9b2c8ec11b..60edff930771 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -35,13 +35,21 @@ public class ClientKillUnusedSegmentsTaskQueryTest private static final Interval INTERVAL = new Interval(START, START.plus(1)); private static final Boolean MARK_UNUSED = true; private static final Integer BATCH_SIZE = 999; + private static final Integer LIMIT = 1000; ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery; @Before public void setUp() { - clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true, BATCH_SIZE); + clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery( + "killTaskId", + DATA_SOURCE, + INTERVAL, + true, + BATCH_SIZE, + LIMIT + ); } @After @@ -80,12 +88,18 @@ public void testGetBatchSize() Assert.assertEquals(BATCH_SIZE, clientKillUnusedSegmentsQuery.getBatchSize()); } + @Test + public void testGetLimit() + { + Assert.assertEquals(LIMIT, clientKillUnusedSegmentsQuery.getLimit()); + } + @Test public void testEquals() { EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class) .usingGetClass() - .withNonnullFields("id", "dataSource", "interval", "batchSize") + .withNonnullFields("id", "dataSource", "interval", "batchSize", "limit") .verify(); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 74e06bfb6645..444e159411e3 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -932,6 +932,22 @@ public void testSimpleUnusedList() throws IOException ); } + @Test + public void testSimpleUnusedListWithLimit() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + markAllSegmentsUnused(); + int limit = SEGMENTS.size() - 1; + Set retreivedUnusedSegments = ImmutableSet.copyOf( + coordinator.retrieveUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + limit + ) + ); + Assert.assertEquals(limit, retreivedUnusedSegments.size()); + Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments)); + } @Test public void testUsedOverlapLow() throws IOException diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index c1e48c496ca1..5b9a88d58410 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -422,7 +422,14 @@ public void test_killPendingSegments() throws Exception public void test_taskPayload() throws ExecutionException, InterruptedException, JsonProcessingException { final String taskID = "taskId_1"; - final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null); + final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery( + taskID, + "test", + null, + null, + null, + null + ); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 8f43b36bbe59..039174eac7e0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -216,11 +216,13 @@ public void testMaxSegmentsToKill() private void runAndVerifyKillInterval(Interval expectedKillInterval) { + int limit = config.getCoordinatorKillMaxSegments(); target.run(params); Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( ArgumentMatchers.anyString(), ArgumentMatchers.eq("DS1"), - ArgumentMatchers.eq(expectedKillInterval) + ArgumentMatchers.eq(expectedKillInterval), + ArgumentMatchers.eq(limit) ); } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 5dec3154d6e2..551151458546 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -593,7 +593,7 @@ public void testKillSegmentsInIntervalInDataSource() Interval theInterval = Intervals.of(interval.replace('_', '/')); OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class); - EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval)) + EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null)) .andReturn(Futures.immediateFuture(null)); EasyMock.replay(overlordClient, server);