diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 260a66a17498..9e59c751bc2d 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -96,15 +96,18 @@ The available grammar is: "dataSource": , "interval" : , "context": , - "batchSize": + "batchSize": , + "limit": } ``` Some of the parameters used in the task payload are further explained below: -| Parameter |Default| Explanation | -|--------------|-------|--------------------------------------------------------------------------------------------------------| +| Parameter | Default | Explanation | +|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| +| `limit` | null - no limit | Maximum number of segments for the kill task to delete.| + **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java index f114ec456465..150648858c15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java @@ -27,6 +27,8 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.List; public class RetrieveUnusedSegmentsAction implements TaskAction> @@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements TaskAction> getReturnTypeReference() { @@ -68,7 +82,8 @@ public TypeReference> getReturnTypeReference() @Override public List perform(Task task, TaskActionToolbox toolbox) { - return toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource, interval); + return toolbox.getIndexerMetadataStorageCoordinator() + .retrieveUnusedSegmentsForInterval(dataSource, interval, limit); } @Override @@ -83,6 +98,7 @@ public String toString() return getClass().getSimpleName() + "{" + "dataSource='" + dataSource + '\'' + ", interval=" + interval + + ", limit=" + limit + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java index 904181a2431c..42d04316a41f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 0d54ae96b05d..35653aa2303f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -26,8 +26,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskToolbox; @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -44,6 +45,8 @@ import org.joda.time.Interval; import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -61,6 +64,7 @@ */ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { + public static final String TYPE = "kill"; private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); /** @@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private final boolean markAsUnused; - /** * Split processing to try and keep each nuke operation relatively short, in the case that either * the database or the storage layer is particularly slow. */ private final int batchSize; + @Nullable private final Integer limit; + - // counter included primarily for testing + // counters included primarily for testing + private int numSegmentsKilled = 0; private long numBatchesProcessed = 0; @JsonCreator @@ -90,8 +96,9 @@ public KillUnusedSegmentsTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("context") Map context, - @JsonProperty("markAsUnused") Boolean markAsUnused, - @JsonProperty("batchSize") Integer batchSize + @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize, + @JsonProperty("limit") @Nullable Integer limit ) { super( @@ -103,6 +110,19 @@ public KillUnusedSegmentsTask( this.markAsUnused = markAsUnused != null && markAsUnused; this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); + if (null != limit && limit <= 0) { + throw InvalidInput.exception( + "limit [%d] is invalid. It must be a positive integer.", + limit + ); + } + if (limit != null && markAsUnused != null && markAsUnused) { + throw InvalidInput.exception( + "limit cannot be provided with markAsUnused.", + limit + ); + } + this.limit = limit; } @JsonProperty @@ -119,10 +139,17 @@ public int getBatchSize() return batchSize; } + @Nullable + @JsonProperty + public Integer getLimit() + { + return limit; + } + @Override public String getType() { - return "kill"; + return TYPE; } @Nonnull @@ -140,6 +167,13 @@ long getNumBatchesProcessed() return numBatchesProcessed; } + @JsonIgnore + @VisibleForTesting + long getNumSegmentsKilled() + { + return numSegmentsKilled; + } + @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { @@ -153,27 +187,29 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } // List unused segments - final List allUnusedSegments = toolbox - .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); - - final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize); - - // The individual activities here on the toolbox have possibility to run for a longer period of time, - // since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the - // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the - // lock to other activity that might need to happen using the overlord tasklockbox. - - LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).", - getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); + int nextBatchSize = computeNextBatchSize(numSegmentsKilled); + @Nullable Integer numTotalBatches = getNumTotalBatches(); + List unusedSegments; + LOG.info( + "Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s", + batchSize, + limit, + numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "." + ); + do { + if (nextBatchSize <= 0) { + break; + } + unusedSegments = toolbox + .getTaskActionClient() + .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); - for (final List unusedSegments : unusedSegmentBatches) { if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { throw new ISE( - "Locks[%s] for task[%s] can't cover segments[%s]", - taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), - getId(), - unusedSegments + "Locks[%s] for task[%s] can't cover segments[%s]", + taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), + getId(), + unusedSegments ); } @@ -186,19 +222,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); toolbox.getDataSegmentKiller().kill(unusedSegments); numBatchesProcessed++; + numSegmentsKilled += unusedSegments.size(); - if (numBatchesProcessed % 10 == 0) { - LOG.info("Processed [%d/%d] batches for kill task[%s].", - numBatchesProcessed, unusedSegmentBatches.size(), getId()); - } - } + LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); - LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.", - getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); + nextBatchSize = computeNextBatchSize(numSegmentsKilled); + } while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); + + LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments " + + "in [%d] batches.", + getId(), + getDataSource(), + getInterval(), + numSegmentsKilled, + numBatchesProcessed + ); return TaskStatus.success(getId()); } + @JsonIgnore + @VisibleForTesting + @Nullable + Integer getNumTotalBatches() + { + return null != limit ? (int) Math.ceil((double) limit / batchSize) : null; + } + + @JsonIgnore + @VisibleForTesting + int computeNextBatchSize(int numSegmentsKilled) + { + return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize; + } + private NavigableMap> getTaskLockMap(TaskActionClient client) throws IOException { final NavigableMap> taskLockMap = new TreeMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java index 3e8b792eb885..d23b3820db74 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java index 5e91ad3375aa..1364bcb597fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval())); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5c3186ef3923..58a2ad435b0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -61,7 +61,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @Type(name = "kill", value = KillUnusedSegmentsTask.class), + @Type(name = KillUnusedSegmentsTask.TYPE, value = KillUnusedSegmentsTask.class), @Type(name = "move", value = MoveTask.class), @Type(name = "archive", value = ArchiveTask.class), @Type(name = "restore", value = RestoreTask.class), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 6149208fc391..2d360dfeb521 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -104,7 +104,7 @@ public void testRetrieveUsedSegmentsAction() @Test public void testRetrieveUnusedSegmentsAction() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUnusedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index 70cd5fcf19b9..3ab6bae4688f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -51,8 +51,9 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), - true, - 99 + false, + 99, + 5 ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); @@ -61,6 +62,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); + Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit()); + } @Test @@ -71,6 +74,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault "datasource", Intervals.of("2020-01-01/P1D"), true, + null, null ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); @@ -80,6 +84,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(100, fromJson.getBatchSize()); + Assert.assertNull(taskQuery.getLimit()); } @Test @@ -91,7 +96,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Intervals.of("2020-01-01/P1D"), null, true, - 99 + 99, + null ); final byte[] json = objectMapper.writeValueAsBytes(task); final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( @@ -103,5 +109,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); + Assert.assertNull(task.getLimit()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index f57624ed7ddf..3a89c19a785a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -81,6 +81,7 @@ public void testKill() throws Exception Intervals.of("2019-03-01/2019-04-01"), null, false, + null, null ); @@ -97,7 +98,9 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getNumBatchesProcessed()); + + Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(1, task.getNumSegmentsKilled()); } @@ -128,6 +131,7 @@ public void testKillWithMarkUnused() throws Exception Intervals.of("2019-03-01/2019-04-01"), null, true, + null, null ); @@ -144,7 +148,8 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getNumBatchesProcessed()); + Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(1, task.getNumSegmentsKilled()); } @Test @@ -157,13 +162,14 @@ public void testGetInputSourceResources() Intervals.of("2019-03-01/2019-04-01"), null, true, + null, null ); Assert.assertTrue(task.getInputSourceResources().isEmpty()); } @Test - public void testKillBatchSizeOne() throws Exception + public void testKillBatchSizeOneAndLimit4() throws Exception { final String version = DateTimes.nowUtc().toString(); final Set segments = ImmutableSet.of( @@ -176,14 +182,23 @@ public void testKillBatchSizeOne() throws Exception Assert.assertEquals(segments, announced); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01") + ) + ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( null, DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, - true, - 1 + false, + 1, + 4 ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -195,6 +210,7 @@ public void testKillBatchSizeOne() throws Exception Assert.assertEquals(Collections.emptyList(), unusedSegments); Assert.assertEquals(4L, task.getNumBatchesProcessed()); + Assert.assertEquals(4, task.getNumSegmentsKilled()); } @Test @@ -218,7 +234,8 @@ public void testKillBatchSizeThree() throws Exception Intervals.of("2018-01-01/2020-01-01"), null, true, - 3 + 3, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -229,7 +246,120 @@ public void testKillBatchSizeThree() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(2L, task.getNumBatchesProcessed()); + Assert.assertEquals(3L, task.getNumBatchesProcessed()); + Assert.assertEquals(4, task.getNumSegmentsKilled()); + } + + @Test + public void testComputeNextBatchSizeDefault() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + null, + null + ); + Assert.assertEquals(100, task.computeNextBatchSize(50)); + } + + @Test + public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 10, + 5 + ); + Assert.assertEquals(5, task.computeNextBatchSize(0)); + } + + @Test + public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(5, task.computeNextBatchSize(0)); + } + + @Test + public void testComputeNextBatchSizeWithRemainingLessThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(3, task.computeNextBatchSize(7)); + } + + @Test + public void testGetNumTotalBatchesDefault() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + null, + null + ); + Assert.assertNull(task.getNumTotalBatches()); + } + + @Test + public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 10, + 5 + ); + Assert.assertEquals(1, (int) task.getNumTotalBatches()); + } + + @Test + public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() + { + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 5, + 10 + ); + Assert.assertEquals(2, (int) task.getNumTotalBatches()); } private static DataSegment newSegment(Interval interval, String version) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 2ea7fcedf193..31ec645b3fdd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -949,6 +949,7 @@ public DataSegment apply(String input) Intervals.of("2011-04-01/P4D"), null, false, + null, null ); @@ -957,7 +958,7 @@ public DataSegment apply(String input) Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size()); - Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount()); + Assert.assertEquals("delete segment batch call count", 2, mdc.getDeleteSegmentsCount()); Assert.assertTrue( "expected unused segments get killed", expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll( @@ -970,6 +971,104 @@ public DataSegment apply(String input) } } + @Test + public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws Exception + { + final File tmpSegmentDir = temporaryFolder.newFolder(); + + List expectedUnusedSegments = Lists.transform( + ImmutableList.of( + "2011-04-01/2011-04-02", + "2011-04-02/2011-04-03", + "2011-04-04/2011-04-05" + ), new Function() + { + @Override + public DataSegment apply(String input) + { + final Interval interval = Intervals.of(input); + try { + return DataSegment.builder() + .dataSource("test_kill_task") + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + tmpSegmentDir.getCanonicalPath() + + "/druid/localStorage/wikipedia/" + + interval.getStart() + + "-" + + interval.getEnd() + + "/" + + "2011-04-6T16:52:46.119-05:00" + + "/0/index.zip" + ) + ) + .version("2011-04-6T16:52:46.119-05:00") + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(NoneShardSpec.instance()) + .binaryVersion(9) + .size(0) + .build(); + } + catch (IOException e) { + throw new ISE(e, "Error creating segments"); + } + } + } + ); + + mdc.setUnusedSegments(expectedUnusedSegments); + + // manually create local segments files + List segmentFiles = new ArrayList<>(); + for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) { + File file = new File((String) segment.getLoadSpec().get("path")); + FileUtils.mkdirp(file.getParentFile()); + Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY); + segmentFiles.add(file); + } + + final int maxSegmentsToKill = 2; + final Task killUnusedSegmentsTask = + new KillUnusedSegmentsTask( + null, + "test_kill_task", + Intervals.of("2011-04-01/P4D"), + null, + false, + null, + maxSegmentsToKill + ); + + final TaskStatus status = runTask(killUnusedSegmentsTask); + Assert.assertEquals(taskLocation, status.getLocation()); + Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", maxSegmentsToKill, mdc.getNuked().size()); + Assert.assertTrue( + "expected unused segments get killed", + expectedUnusedSegments.containsAll(mdc.getNuked()) + ); + + int expectedNumOfSegmentsRemaining = segmentFiles.size() - maxSegmentsToKill; + int actualNumOfSegmentsRemaining = 0; + for (File file : segmentFiles) { + if (file.exists()) { + actualNumOfSegmentsRemaining++; + } + } + + Assert.assertEquals( + "Expected of segments deleted did not match expectations", + expectedNumOfSegmentsRemaining, + actualNumOfSegmentsRemaining + ); + } + @Test public void testRealtimeishTask() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index c8bf8fd28ab1..66af0a973043 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { @@ -110,6 +111,22 @@ public List retrieveUnusedSegmentsForInterval(String dataSource, In } } + @Override + public List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval, @Nullable Integer limit) + { + synchronized (unusedSegments) { + Stream resultStream = unusedSegments.stream(); + + resultStream = resultStream.filter(ds -> !nuked.contains(ds)); + + if (limit != null) { + resultStream = resultStream.limit(limit); + } + + return ImmutableList.copyOf(resultStream.iterator()); + } + } + @Override public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index 779c8acc7f28..3676d684097f 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.Objects; /** @@ -40,14 +42,16 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final Interval interval; private final Boolean markAsUnused; private final Integer batchSize; + @Nullable private final Integer limit; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, - @JsonProperty("markAsUnused") Boolean markAsUnused, - @JsonProperty("batchSize") Integer batchSize + @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize, + @JsonProperty("limit") Integer limit ) { this.id = Preconditions.checkNotNull(id, "id"); @@ -55,6 +59,8 @@ public ClientKillUnusedSegmentsTaskQuery( this.interval = interval; this.markAsUnused = markAsUnused; this.batchSize = batchSize; + Preconditions.checkArgument(limit == null || limit > 0, "limit must be > 0"); + this.limit = limit; } @JsonProperty @@ -96,6 +102,14 @@ public Integer getBatchSize() return batchSize; } + @JsonProperty + @Nullable + public Integer getLimit() + { + return limit; + } + + @Override public boolean equals(Object o) { @@ -110,12 +124,13 @@ public boolean equals(Object o) && Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval) && Objects.equals(markAsUnused, that.markAsUnused) - && Objects.equals(batchSize, that.batchSize); + && Objects.equals(batchSize, that.batchSize) + && Objects.equals(limit, that.limit); } @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused, batchSize); + return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, limit); } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index b3c70f0cdbe9..3d8f4b858657 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -123,17 +123,30 @@ Collection retrieveUsedSegmentsForIntervals( Segments visibility ); + /** + * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)} + */ + default List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null); + } + /** * Retrieve all published segments which include ONLY data within the given interval and are marked as unused from the * metadata store. * - * @param dataSource The data source the segments belong to - * @param interval Filter the data segments to ones that include data in this interval exclusively. + * @param dataSource The data source the segments belong to + * @param interval Filter the data segments to ones that include data in this interval exclusively. + * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * * @return DataSegments which include ONLY data within the requested interval and are marked as unused. Segments NOT * returned here may include data in the interval */ - List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval); + List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit + ); /** * Mark as unused segments which include ONLY data within the given interval. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 107dc4b9f97a..6c4d523133a7 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -190,12 +190,22 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin @Override public List retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null); + } + + @Override + public List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit + ) { final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval))) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index d98c5d4d0861..46db26a56d6c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -574,7 +574,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 13460e2695d0..45896a865ef9 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -37,6 +37,8 @@ import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -104,7 +106,7 @@ public CloseableIterator retrieveUsedSegments( final Collection intervals ) { - return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true); + return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null); } /** @@ -118,10 +120,11 @@ public CloseableIterator retrieveUsedSegments( */ public CloseableIterator retrieveUnusedSegments( final String dataSource, - final Collection intervals + final Collection intervals, + @Nullable final Integer limit ) { - return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false); + return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit); } /** @@ -201,7 +204,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( Iterators.transform( - retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true), + retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null), DataSegment::getId ) ); @@ -213,7 +216,8 @@ private CloseableIterator retrieveSegments( final String dataSource, final Collection intervals, final IntervalMode matchMode, - final boolean used + final boolean used, + @Nullable final Integer limit ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. @@ -259,6 +263,9 @@ private CloseableIterator retrieveSegments( .setFetchSize(connector.getStreamingFetchSize()) .bind("used", used) .bind("dataSource", dataSource); + if (null != limit) { + sql.setMaxRows(limit); + } if (compareAsString) { final Iterator iterator = intervals.iterator(); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 51b4323a11f4..b4391dc0a8ef 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -84,9 +84,40 @@ public interface OverlordClient * @return future with task ID */ default ListenableFuture runKillTask(String idPrefix, String dataSource, Interval interval) + { + return runKillTask(idPrefix, dataSource, interval, null); + } + + /** + * Run a "kill" task for a particular datasource and interval. Shortcut to {@link #runTask(String, Object)}. + * + * The kill task deletes all unused segment records from deep storage and the metadata store. The task runs + * asynchronously after the API call returns. The resolved future is the ID of the task, which can be used to + * monitor its progress through the {@link #taskStatus(String)} API. + * + * @param idPrefix Descriptive prefix to include at the start of task IDs + * @param dataSource Datasource to kill + * @param interval Interval to kill + * @param maxSegmentsToKill The maximum number of segments to kill + * + * @return future with task ID + */ + default ListenableFuture runKillTask( + String idPrefix, + String dataSource, + Interval interval, + @Nullable Integer maxSegmentsToKill + ) { final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); - final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null); + final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery( + taskId, + dataSource, + interval, + false, + null, + maxSegmentsToKill + ); return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> taskId); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index f4b6240b43a6..205947b00392 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -33,6 +33,8 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.List; @@ -130,7 +132,12 @@ private void killUnusedSegments(Collection dataSourcesToKill) } try { - FutureUtils.getUnchecked(overlordClient.runKillTask("coordinator-issued", dataSource, intervalToKill), true); + FutureUtils.getUnchecked(overlordClient.runKillTask( + "coordinator-issued", + dataSource, + intervalToKill, + maxSegmentsToKill + ), true); ++submittedTasks; } catch (Exception ex) { @@ -148,6 +155,7 @@ private void killUnusedSegments(Collection dataSourcesToKill) /** * Calculates the interval for which segments are to be killed in a datasource. */ + @Nullable private Interval findIntervalForKill(String dataSource) { final DateTime maxEndTime = ignoreRetainDuration diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 4a00f61300c9..1e146736da13 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -342,7 +342,7 @@ public Response killUnusedSegmentsInInterval( } final Interval theInterval = Intervals.of(interval.replace('_', '/')); try { - FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", dataSourceName, theInterval), true); + FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null), true); return Response.ok().build(); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index af9b2c8ec11b..60edff930771 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -35,13 +35,21 @@ public class ClientKillUnusedSegmentsTaskQueryTest private static final Interval INTERVAL = new Interval(START, START.plus(1)); private static final Boolean MARK_UNUSED = true; private static final Integer BATCH_SIZE = 999; + private static final Integer LIMIT = 1000; ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery; @Before public void setUp() { - clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true, BATCH_SIZE); + clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery( + "killTaskId", + DATA_SOURCE, + INTERVAL, + true, + BATCH_SIZE, + LIMIT + ); } @After @@ -80,12 +88,18 @@ public void testGetBatchSize() Assert.assertEquals(BATCH_SIZE, clientKillUnusedSegmentsQuery.getBatchSize()); } + @Test + public void testGetLimit() + { + Assert.assertEquals(LIMIT, clientKillUnusedSegmentsQuery.getLimit()); + } + @Test public void testEquals() { EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class) .usingGetClass() - .withNonnullFields("id", "dataSource", "interval", "batchSize") + .withNonnullFields("id", "dataSource", "interval", "batchSize", "limit") .verify(); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 74e06bfb6645..444e159411e3 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -932,6 +932,22 @@ public void testSimpleUnusedList() throws IOException ); } + @Test + public void testSimpleUnusedListWithLimit() throws IOException + { + coordinator.announceHistoricalSegments(SEGMENTS); + markAllSegmentsUnused(); + int limit = SEGMENTS.size() - 1; + Set retreivedUnusedSegments = ImmutableSet.copyOf( + coordinator.retrieveUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + limit + ) + ); + Assert.assertEquals(limit, retreivedUnusedSegments.size()); + Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments)); + } @Test public void testUsedOverlapLow() throws IOException diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index c1e48c496ca1..5b9a88d58410 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -422,7 +422,14 @@ public void test_killPendingSegments() throws Exception public void test_taskPayload() throws ExecutionException, InterruptedException, JsonProcessingException { final String taskID = "taskId_1"; - final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null); + final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery( + taskID, + "test", + null, + null, + null, + null + ); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 8f43b36bbe59..039174eac7e0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -216,11 +216,13 @@ public void testMaxSegmentsToKill() private void runAndVerifyKillInterval(Interval expectedKillInterval) { + int limit = config.getCoordinatorKillMaxSegments(); target.run(params); Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( ArgumentMatchers.anyString(), ArgumentMatchers.eq("DS1"), - ArgumentMatchers.eq(expectedKillInterval) + ArgumentMatchers.eq(expectedKillInterval), + ArgumentMatchers.eq(limit) ); } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 5dec3154d6e2..551151458546 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -593,7 +593,7 @@ public void testKillSegmentsInIntervalInDataSource() Interval theInterval = Intervals.of(interval.replace('_', '/')); OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class); - EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval)) + EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null)) .andReturn(Futures.immediateFuture(null)); EasyMock.replay(overlordClient, server);