diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSourcesSnapshotBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSourcesSnapshotBenchmark.java index ab6fff0186e0..23067aa83183 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSourcesSnapshotBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSourcesSnapshotBenchmark.java @@ -41,9 +41,7 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @@ -54,7 +52,7 @@ @Measurement(iterations = 50) public class DataSourcesSnapshotBenchmark { - private static Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000"); + private static final Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000"); @Param({"500", "1000"}) private int numDataSources; @@ -69,11 +67,10 @@ public void setUp() { long start = System.currentTimeMillis(); - Map dataSources = new HashMap<>(); + final List segments = new ArrayList<>(); for (int i = 0; i < numDataSources; i++) { String dataSource = StringUtils.format("ds-%d", i); - List segments = new ArrayList<>(); for (int j = 0; j < numSegmentPerDataSource; j++) { segments.add( @@ -90,11 +87,9 @@ public void setUp() ) ); } - - dataSources.put(dataSource, new ImmutableDruidDataSource(dataSource, Collections.emptyMap(), segments)); } - snapshot = new DataSourcesSnapshot(dataSources); + snapshot = DataSourcesSnapshot.fromUsedSegments(segments); System.out.println("Setup Time " + (System.currentTimeMillis() - start) + " ms"); } diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index fb938dca4cc2..78c166e266b9 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -128,7 +127,7 @@ public void setup() } } } - dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()).getUsedSegmentsTimelinesPerDataSource(); + dataSources = DataSourcesSnapshot.fromUsedSegments(segments).getUsedSegmentsTimelinesPerDataSource(); } @Benchmark diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index 9e2668c81097..020c0a0ac22c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.inject.Inject; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -52,7 +53,6 @@ import org.apache.druid.server.coordinator.stats.CoordinatorStat; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.Stats; -import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Duration; import java.util.ArrayList; @@ -272,7 +272,7 @@ private synchronized void scheduledRun() private synchronized void runCompactionDuty() { final CoordinatorRunStats stats = new CoordinatorRunStats(); - duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats); + duty.run(getLatestConfig(), getDatasourceSnapshot(), supervisorConfig.getEngine(), stats); // Emit stats only if emission period has elapsed if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) { @@ -309,7 +309,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon if (isRunning()) { return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig( getLatestConfig().withClusterConfig(updateRequest), - getCurrentDatasourceTimelines(), + getDatasourceSnapshot(), supervisorConfig.getEngine() ); } else { @@ -334,10 +334,9 @@ private DruidCompactionConfig getLatestConfig() .withDatasourceConfigs(new ArrayList<>(activeDatasourceConfigs.values())); } - private Map getCurrentDatasourceTimelines() + private DataSourcesSnapshot getDatasourceSnapshot() { - return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource(); + return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments(); } private void scheduleOnExecutor(Runnable runnable) diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java index 5f58117a4ec9..27f16fcddb16 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -21,11 +21,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.utils.CollectionUtils; +import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.ArrayList; @@ -40,43 +42,56 @@ */ public class DataSourcesSnapshot { - public static DataSourcesSnapshot fromUsedSegments( - Iterable segments, - ImmutableMap dataSourceProperties - ) + public static DataSourcesSnapshot fromUsedSegments(Iterable segments) { + return fromUsedSegments(segments, DateTimes.nowUtc()); + } + + /** + * Creates a snapshot of all "used" segments that existed in the database at + * the {@code snapshotTime}. + */ + public static DataSourcesSnapshot fromUsedSegments(Iterable segments, DateTime snapshotTime) + { + final Map dataSourceProperties = ImmutableMap.of("created", DateTimes.nowUtc().toString()); Map dataSources = new HashMap<>(); - segments.forEach(segment -> { - dataSources - .computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties)) - .addSegmentIfAbsent(segment); - }); - return new DataSourcesSnapshot(CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource)); + segments.forEach( + segment -> dataSources + .computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties)) + .addSegmentIfAbsent(segment) + ); + return new DataSourcesSnapshot( + snapshotTime, + CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource) + ); } + private final DateTime snapshotTime; private final Map dataSourcesWithAllUsedSegments; private final Map usedSegmentsTimelinesPerDataSource; private final ImmutableSet overshadowedSegments; - public DataSourcesSnapshot(Map dataSourcesWithAllUsedSegments) + private DataSourcesSnapshot( + DateTime snapshotTime, + Map dataSourcesWithAllUsedSegments + ) { - this( + this.snapshotTime = snapshotTime; + this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments; + this.usedSegmentsTimelinesPerDataSource = CollectionUtils.mapValues( dataSourcesWithAllUsedSegments, - CollectionUtils.mapValues( - dataSourcesWithAllUsedSegments, - dataSource -> SegmentTimeline.forSegments(dataSource.getSegments()) - ) + dataSource -> SegmentTimeline.forSegments(dataSource.getSegments()) ); + this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments()); } - private DataSourcesSnapshot( - Map dataSourcesWithAllUsedSegments, - Map usedSegmentsTimelinesPerDataSource - ) + /** + * Time when this snapshot was taken. Since polling segments from the database + * may be a slow operation, this represents the poll start time. + */ + public DateTime getSnapshotTime() { - this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments; - this.usedSegmentsTimelinesPerDataSource = usedSegmentsTimelinesPerDataSource; - this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments()); + return snapshotTime; } public Collection getDataSourcesWithAllUsedSegments() diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 12327785b924..053dc10f41c2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -1035,6 +1035,7 @@ private void doPoll() private void doPollSegments() { + final DateTime startTime = DateTimes.nowUtc(); final Stopwatch stopwatch = Stopwatch.createStarted(); // Some databases such as PostgreSQL require auto-commit turned off @@ -1071,11 +1072,12 @@ private void doPollSegments() segments.size(), stopwatch.millisElapsed() ); - createDatasourcesSnapshot(segments); + createDatasourcesSnapshot(startTime, segments); } private void doPollSegmentAndSchema() { + final DateTime startTime = DateTimes.nowUtc(); final Stopwatch stopwatch = Stopwatch.createStarted(); ImmutableMap.Builder segmentMetadataBuilder = new ImmutableMap.Builder<>(); @@ -1174,7 +1176,7 @@ public List inTransaction(Handle handle, TransactionStatus status) segments.size(), schemaMap.size(), stopwatch.millisElapsed() ); - createDatasourcesSnapshot(segments); + createDatasourcesSnapshot(startTime, segments); } private void emitMetric(String metricName, long value) @@ -1182,7 +1184,7 @@ private void emitMetric(String metricName, long value) serviceEmitter.emit(new ServiceMetricEvent.Builder().setMetric(metricName, value)); } - private void createDatasourcesSnapshot(List segments) + private void createDatasourcesSnapshot(DateTime snapshotTime, List segments) { final Stopwatch stopwatch = Stopwatch.createStarted(); // dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or @@ -1193,11 +1195,10 @@ private void createDatasourcesSnapshot(List segments) // segment mark calls in rapid succession. So the snapshot update is not done outside of database poll at this time. // Updates outside of database polls were primarily for the user experience, so users would immediately see the // effect of a segment mark call reflected in MetadataResource API calls. - ImmutableMap dataSourceProperties = createDefaultDataSourceProperties(); dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments( Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method). - dataSourceProperties + snapshotTime ); emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed()); log.debug( @@ -1206,11 +1207,6 @@ private void createDatasourcesSnapshot(List segments) ); } - private static ImmutableMap createDefaultDataSourceProperties() - { - return ImmutableMap.of("created", DateTimes.nowUtc().toString()); - } - /** * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java index e3b1c66fba8e..db0a5f8c98c4 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; @@ -41,7 +42,6 @@ import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; -import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -76,7 +76,7 @@ public CompactionRunSimulator( */ public CompactionSimulateResult simulateRunWithConfig( DruidCompactionConfig compactionConfig, - Map datasourceTimelines, + DataSourcesSnapshot dataSourcesSnapshot, CompactionEngine defaultEngine ) { @@ -146,7 +146,7 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan final CoordinatorRunStats stats = new CoordinatorRunStats(); new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run( configWithUnlimitedTaskSlots, - datasourceTimelines, + dataSourcesSnapshot, defaultEngine, stats ); diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index ab7ddbbb91a7..cbf5f25f9d7b 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * Tracks status of recently submitted compaction tasks. Can be used by a segment @@ -50,6 +51,8 @@ public class CompactionStatusTracker private final ConcurrentHashMap submittedTaskIdToSegments = new ConcurrentHashMap<>(); + private final AtomicReference segmentSnapshotTime = new AtomicReference<>(); + @Inject public CompactionStatusTracker(ObjectMapper objectMapper) { @@ -109,6 +112,16 @@ public CompactionStatus computeCompactionStatus( return CompactionStatus.skipped("Task for interval is already running"); } + // Skip intervals that have been recently compacted if segment timeline is not updated yet + final DateTime snapshotTime = segmentSnapshotTime.get(); + if (lastTaskStatus != null + && lastTaskStatus.getState() == TaskState.SUCCESS + && snapshotTime != null && snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) { + return CompactionStatus.skipped( + "Segment timeline not updated since last compaction task succeeded" + ); + } + // Skip intervals that have been filtered out by the policy if (!searchPolicy.isEligibleForCompaction(candidate, compactionStatus, lastTaskStatus)) { return CompactionStatus.skipped("Rejected by search policy"); @@ -125,6 +138,11 @@ public void onCompactionStatusComputed( // Nothing to do, used by simulator } + public void onSegmentTimelineUpdated(DateTime snapshotTime) + { + this.segmentSnapshotTime.set(snapshotTime); + } + public void onCompactionConfigUpdated(DruidCompactionConfig compactionConfig) { final Set compactionEnabledDatasources = new HashSet<>(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 4c16ba1107cd..f8c29cd79849 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMaps; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; @@ -37,6 +38,7 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -333,8 +335,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon return new CompactionRunSimulator(compactionStatusTracker, overlordClient).simulateRunWithConfig( metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest), metadataManager.segments() - .getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource(), + .getSnapshotOfDataSourcesWithAllUsedSegments(), CompactionEngine.NATIVE ); } @@ -669,9 +670,23 @@ public void run() } if (metadataManager.isStarted() && serverInventoryView.isStarted()) { + final DataSourcesSnapshot dataSourcesSnapshot; + if (dutyGroup.getName().equals(COMPACT_SEGMENTS_DUTIES_DUTY_GROUP)) { + // If this is a compact segments duty group triggered by IT, + // use a future snapshotTime to ensure that compaction always runs + dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments( + metadataManager.segments() + .getSnapshotOfDataSourcesWithAllUsedSegments() + .iterateAllUsedSegmentsInSnapshot(), + DateTimes.nowUtc().plusMinutes(60) + ); + } else { + dataSourcesSnapshot = metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments(); + } + final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .builder() - .withDataSourcesSnapshot(metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments()) + .withDataSourcesSnapshot(dataSourcesSnapshot) .withDynamicConfigs(metadataManager.configs().getCurrentDynamicConfig()) .withCompactionConfig(metadataManager.configs().getCurrentCompactionConfig()) .build(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index ebef87546320..576b2155ac7e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.loading.SegmentHolder; @@ -331,7 +330,7 @@ public Builder withUsedSegments(DataSegment... usedSegments) public Builder withUsedSegments(Collection usedSegments) { this.usedSegmentsNewestFirst = createUsedSegmentsSet(usedSegments); - this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments, ImmutableMap.of()); + this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments); return this; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 035286692bf5..ca80c7efe30e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; @@ -61,7 +62,6 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -129,7 +129,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } else { run( params.getCompactionConfig(), - params.getUsedSegmentsTimelinesPerDataSource(), + params.getDataSourcesSnapshot(), CompactionEngine.NATIVE, params.getCoordinatorStats() ); @@ -139,7 +139,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) public void run( DruidCompactionConfig dynamicConfig, - Map dataSources, + DataSourcesSnapshot dataSources, CompactionEngine defaultEngine, CoordinatorRunStats stats ) @@ -150,6 +150,7 @@ public void run( return; } + statusTracker.onSegmentTimelineUpdated(dataSources.getSnapshotTime()); statusTracker.onCompactionConfigUpdated(dynamicConfig); List compactionConfigList = dynamicConfig.getCompactionConfigs(); if (compactionConfigList == null || compactionConfigList.isEmpty()) { @@ -225,7 +226,7 @@ public void run( final CompactionSegmentIterator iterator = new PriorityBasedCompactionSegmentIterator( policy, compactionConfigs, - dataSources, + dataSources.getUsedSegmentsTimelinesPerDataSource(), intervalsToSkipCompaction, statusTracker ); diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java index aacf4216de78..690a4bcfc7cb 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java @@ -76,8 +76,7 @@ public void testSimulateClusterCompactionConfigUpdate() DruidCompactionConfig.empty().withDatasourceConfig( DataSourceCompactionConfig.builder().forDataSource("wiki").build() ), - segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource(), + segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments(), CompactionEngine.NATIVE ); diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java index f64c0865d6db..8cd20bfdb88b 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java @@ -24,8 +24,10 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestDataSource; import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -104,6 +106,39 @@ public void testGetLatestTaskStatusForRepeatedlyFailingTask() Assert.assertEquals(2, status.getNumConsecutiveFailures()); } + @Test + public void testComputeCompactionStatusForSuccessfulTask() + { + final DataSourceCompactionConfig compactionConfig + = DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(); + final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null); + final CompactionCandidate candidateSegments + = CompactionCandidate.from(Collections.singletonList(WIKI_SEGMENT)); + + // Verify that interval is originally eligible for compaction + CompactionStatus status + = statusTracker.computeCompactionStatus(candidateSegments, compactionConfig, policy); + Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); + Assert.assertEquals("not compacted yet", status.getReason()); + + // Verify that interval is skipped for compaction after task has finished + statusTracker.onSegmentTimelineUpdated(DateTimes.nowUtc().minusMinutes(1)); + statusTracker.onTaskSubmitted(createCompactionTask("task1"), candidateSegments); + statusTracker.onTaskFinished("task1", TaskStatus.success("task1")); + + status = statusTracker.computeCompactionStatus(candidateSegments, compactionConfig, policy); + Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState()); + Assert.assertEquals( + "Segment timeline not updated since last compaction task succeeded", + status.getReason() + ); + + // Verify that interval becomes eligible again after timeline has been updated + statusTracker.onSegmentTimelineUpdated(DateTimes.nowUtc()); + status = statusTracker.computeCompactionStatus(candidateSegments, compactionConfig, policy); + Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); + } + private ClientCompactionTaskQuery createCompactionTask( String taskId ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 0e8ff749dbf3..512efd0316e8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -777,7 +777,6 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception ) ).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes(); EasyMock.replay(configManager); - DruidDataSource dataSource = new DruidDataSource("dataSource1", Collections.emptyMap()); DataSegment dataSegment = new DataSegment( "dataSource1", Intervals.of("2010-01-01/P1D"), @@ -789,9 +788,9 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception 0x9, 0 ); - dataSource.addSegment(dataSegment); - DataSourcesSnapshot dataSourcesSnapshot = - new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource())); + DataSourcesSnapshot dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments( + Collections.singleton(dataSegment) + ); EasyMock .expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()) .andReturn(dataSourcesSnapshot) @@ -969,9 +968,7 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception @Test public void testSimulateRunWithEmptyDatasourceCompactionConfigs() { - DruidDataSource dataSource = new DruidDataSource("dataSource", Collections.emptyMap()); - DataSourcesSnapshot dataSourcesSnapshot = - new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource())); + DataSourcesSnapshot dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(Collections.emptyList()); EasyMock .expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()) .andReturn(dataSourcesSnapshot) @@ -1022,7 +1019,7 @@ private void setupSegmentsMetadataMock(DruidDataSource dataSource) .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource())) .anyTimes(); DataSourcesSnapshot dataSourcesSnapshot = - new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource())); + DataSourcesSnapshot.fromUsedSegments(dataSource.getSegments()); EasyMock .expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()) .andReturn(dataSourcesSnapshot) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 4009081474be..31789dd540ca 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -176,6 +176,7 @@ public static Collection constructorFeeder() private final BiFunction shardSpecFactory; private final CompactionEngine engine; + private final List allSegments = new ArrayList<>(); private DataSourcesSnapshot dataSources; private CompactionStatusTracker statusTracker; Map> datasourceToSegments = new HashMap<>(); @@ -194,7 +195,7 @@ public CompactSegmentsTest( @Before public void setup() { - List allSegments = new ArrayList<>(); + allSegments.clear(); for (int i = 0; i < 3; i++) { final String dataSource = DATA_SOURCE_PREFIX + i; for (int j : new int[]{0, 1, 2, 3, 7, 8}) { @@ -209,7 +210,7 @@ public void setup() } } } - dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments, ImmutableMap.of()); + dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments); statusTracker = new CompactionStatusTracker(JSON_MAPPER); } @@ -435,7 +436,7 @@ public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIn } } - dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); + dataSources = DataSourcesSnapshot.fromUsedSegments(segments); final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER); final CompactSegments compactSegments = new CompactSegments(statusTracker, overlordClient); @@ -589,7 +590,7 @@ public void testMakeStatsForDataSourceWithSkipped() } } - dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); + dataSources = DataSourcesSnapshot.fromUsedSegments(segments); final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER); final CompactSegments compactSegments = new CompactSegments(statusTracker, overlordClient); @@ -1497,7 +1498,7 @@ public void testDetermineSegmentGranularityFromSegmentsToCompact() 10L ) ); - dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); + dataSources = DataSourcesSnapshot.fromUsedSegments(segments); final OverlordClient mockClient = Mockito.mock(OverlordClient.class); final ArgumentCaptor payloadCaptor = setUpMockClient(mockClient); @@ -1584,7 +1585,7 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon 10L ) ); - dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); + dataSources = DataSourcesSnapshot.fromUsedSegments(segments); final OverlordClient mockClient = Mockito.mock(OverlordClient.class); final ArgumentCaptor payloadCaptor = setUpMockClient(mockClient); @@ -1988,22 +1989,14 @@ private void assertLastSegmentNotCompacted(CompactSegments compactSegments) private void addMoreData(String dataSource, int day) { - final SegmentTimeline timeline - = dataSources.getUsedSegmentsTimelinesPerDataSource().get(dataSource); for (int i = 0; i < 2; i++) { - DataSegment newSegment = createSegment(dataSource, day, true, i); - timeline.add( - newSegment.getInterval(), - newSegment.getVersion(), - newSegment.getShardSpec().createChunk(newSegment) - ); - newSegment = createSegment(dataSource, day, false, i); - timeline.add( - newSegment.getInterval(), - newSegment.getVersion(), - newSegment.getShardSpec().createChunk(newSegment) - ); + allSegments.add(createSegment(dataSource, day, true, i)); + allSegments.add(createSegment(dataSource, day, false, i)); } + + // Recreate the DataSourcesSnapshot with a future snapshotTime so that the + // statusTracker considers the intervals with new data eligible for compaction again + dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments, DateTimes.nowUtc().plusMinutes(10)); } private List createCompactionConfigs() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 934f61dbe07e..ad8294786e6d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator.simulate; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.metadata.SegmentsMetadataManager; @@ -171,7 +170,7 @@ public Collection getImmutableDataSourcesWithAllUsedSe public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() { if (snapshot == null) { - snapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments.values(), ImmutableMap.of()); + snapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments.values()); } return snapshot; }