Skip to content

Commit

Permalink
Fix duplicate compaction task launched by OverlordCompactionScheduler (
Browse files Browse the repository at this point in the history
…apache#17287)

Description
-----------
The `OverlordCompactionScheduler` may sometimes launch a duplicate compaction
task for an interval that has just been compacted.

This may happen as follows:
- Scheduler launches a compaction task for an uncompacted interval.
- While the compaction task is running, the `CompactionStatusTracker` does not consider
this interval as compactible and returns the `CompactionStatus` as `SKIPPED` for it.
- As soon as the compaction task finishes, the `CompactionStatusTracker` starts considering
the interval eligible for compaction again.
- This interval remains eligible for compaction until the newly published segments are polled
from the database.
- Once the new segments have been polled, the `CompactionStatus` of the interval changes
to `COMPLETE`.

Change
--------
- Keep track of the `snapshotTime` in `DataSourcesSnapshot`. This time represents the start of the poll.
- Use the `snapshotTime` to determine if a poll has happened after a compaction task completed.
- If not, then skip the interval to avoid launching duplicate tasks.
- For tests, use a future `snapshotTime` to ensure that compaction is always triggered.
  • Loading branch information
kfaraz authored Oct 10, 2024
1 parent 4fdb381 commit 3f797c5
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
Expand All @@ -54,7 +52,7 @@
@Measurement(iterations = 50)
public class DataSourcesSnapshotBenchmark
{
private static Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000");
private static final Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000");

@Param({"500", "1000"})
private int numDataSources;
Expand All @@ -69,11 +67,10 @@ public void setUp()
{
long start = System.currentTimeMillis();

Map<String, ImmutableDruidDataSource> dataSources = new HashMap<>();
final List<DataSegment> segments = new ArrayList<>();

for (int i = 0; i < numDataSources; i++) {
String dataSource = StringUtils.format("ds-%d", i);
List<DataSegment> segments = new ArrayList<>();

for (int j = 0; j < numSegmentPerDataSource; j++) {
segments.add(
Expand All @@ -90,11 +87,9 @@ public void setUp()
)
);
}

dataSources.put(dataSource, new ImmutableDruidDataSource(dataSource, Collections.emptyMap(), segments));
}

snapshot = new DataSourcesSnapshot(dataSources);
snapshot = DataSourcesSnapshot.fromUsedSegments(segments);

System.out.println("Setup Time " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.server.coordinator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void setup()
}
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()).getUsedSegmentsTimelinesPerDataSource();
dataSources = DataSourcesSnapshot.fromUsedSegments(segments).getUsedSegmentsTimelinesPerDataSource();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -52,7 +53,6 @@
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Duration;

import java.util.ArrayList;
Expand Down Expand Up @@ -272,7 +272,7 @@ private synchronized void scheduledRun()
private synchronized void runCompactionDuty()
{
final CoordinatorRunStats stats = new CoordinatorRunStats();
duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats);
duty.run(getLatestConfig(), getDatasourceSnapshot(), supervisorConfig.getEngine(), stats);

// Emit stats only if emission period has elapsed
if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) {
Expand Down Expand Up @@ -309,7 +309,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon
if (isRunning()) {
return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig(
getLatestConfig().withClusterConfig(updateRequest),
getCurrentDatasourceTimelines(),
getDatasourceSnapshot(),
supervisorConfig.getEngine()
);
} else {
Expand All @@ -334,10 +334,9 @@ private DruidCompactionConfig getLatestConfig()
.withDatasourceConfigs(new ArrayList<>(activeDatasourceConfigs.values()));
}

private Map<String, SegmentTimeline> getCurrentDatasourceTimelines()
private DataSourcesSnapshot getDatasourceSnapshot()
{
return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments()
.getUsedSegmentsTimelinesPerDataSource();
return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments();
}

private void scheduleOnExecutor(Runnable runnable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -40,43 +42,56 @@
*/
public class DataSourcesSnapshot
{
public static DataSourcesSnapshot fromUsedSegments(
Iterable<DataSegment> segments,
ImmutableMap<String, String> dataSourceProperties
)
public static DataSourcesSnapshot fromUsedSegments(Iterable<DataSegment> segments)
{
return fromUsedSegments(segments, DateTimes.nowUtc());
}

/**
* Creates a snapshot of all "used" segments that existed in the database at
* the {@code snapshotTime}.
*/
public static DataSourcesSnapshot fromUsedSegments(Iterable<DataSegment> segments, DateTime snapshotTime)
{
final Map<String, String> dataSourceProperties = ImmutableMap.of("created", DateTimes.nowUtc().toString());
Map<String, DruidDataSource> dataSources = new HashMap<>();
segments.forEach(segment -> {
dataSources
.computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties))
.addSegmentIfAbsent(segment);
});
return new DataSourcesSnapshot(CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource));
segments.forEach(
segment -> dataSources
.computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties))
.addSegmentIfAbsent(segment)
);
return new DataSourcesSnapshot(
snapshotTime,
CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource)
);
}

private final DateTime snapshotTime;
private final Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments;
private final Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource;
private final ImmutableSet<DataSegment> overshadowedSegments;

public DataSourcesSnapshot(Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments)
private DataSourcesSnapshot(
DateTime snapshotTime,
Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments
)
{
this(
this.snapshotTime = snapshotTime;
this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments;
this.usedSegmentsTimelinesPerDataSource = CollectionUtils.mapValues(
dataSourcesWithAllUsedSegments,
CollectionUtils.mapValues(
dataSourcesWithAllUsedSegments,
dataSource -> SegmentTimeline.forSegments(dataSource.getSegments())
)
dataSource -> SegmentTimeline.forSegments(dataSource.getSegments())
);
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
}

private DataSourcesSnapshot(
Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments,
Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource
)
/**
* Time when this snapshot was taken. Since polling segments from the database
* may be a slow operation, this represents the poll start time.
*/
public DateTime getSnapshotTime()
{
this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments;
this.usedSegmentsTimelinesPerDataSource = usedSegmentsTimelinesPerDataSource;
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
return snapshotTime;
}

public Collection<ImmutableDruidDataSource> getDataSourcesWithAllUsedSegments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,7 @@ private void doPoll()

private void doPollSegments()
{
final DateTime startTime = DateTimes.nowUtc();
final Stopwatch stopwatch = Stopwatch.createStarted();

// Some databases such as PostgreSQL require auto-commit turned off
Expand Down Expand Up @@ -1071,11 +1072,12 @@ private void doPollSegments()
segments.size(), stopwatch.millisElapsed()
);

createDatasourcesSnapshot(segments);
createDatasourcesSnapshot(startTime, segments);
}

private void doPollSegmentAndSchema()
{
final DateTime startTime = DateTimes.nowUtc();
final Stopwatch stopwatch = Stopwatch.createStarted();

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentMetadataBuilder = new ImmutableMap.Builder<>();
Expand Down Expand Up @@ -1174,15 +1176,15 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);

createDatasourcesSnapshot(segments);
createDatasourcesSnapshot(startTime, segments);
}

private void emitMetric(String metricName, long value)
{
serviceEmitter.emit(new ServiceMetricEvent.Builder().setMetric(metricName, value));
}

private void createDatasourcesSnapshot(List<DataSegment> segments)
private void createDatasourcesSnapshot(DateTime snapshotTime, List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
Expand All @@ -1193,11 +1195,10 @@ private void createDatasourcesSnapshot(List<DataSegment> segments)
// segment mark calls in rapid succession. So the snapshot update is not done outside of database poll at this time.
// Updates outside of database polls were primarily for the user experience, so users would immediately see the
// effect of a segment mark call reflected in MetadataResource API calls.
ImmutableMap<String, String> dataSourceProperties = createDefaultDataSourceProperties();

dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
dataSourceProperties
snapshotTime
);
emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed());
log.debug(
Expand All @@ -1206,11 +1207,6 @@ private void createDatasourcesSnapshot(List<DataSegment> segments)
);
}

private static ImmutableMap<String, String> createDefaultDataSourceProperties()
{
return ImmutableMap.of("created", DateTimes.nowUtc().toString());
}

/**
* For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
* (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
Expand All @@ -41,7 +42,6 @@
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -76,7 +76,7 @@ public CompactionRunSimulator(
*/
public CompactionSimulateResult simulateRunWithConfig(
DruidCompactionConfig compactionConfig,
Map<String, SegmentTimeline> datasourceTimelines,
DataSourcesSnapshot dataSourcesSnapshot,
CompactionEngine defaultEngine
)
{
Expand Down Expand Up @@ -146,7 +146,7 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan
final CoordinatorRunStats stats = new CoordinatorRunStats();
new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
configWithUnlimitedTaskSlots,
datasourceTimelines,
dataSourcesSnapshot,
defaultEngine,
stats
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* Tracks status of recently submitted compaction tasks. Can be used by a segment
Expand All @@ -50,6 +51,8 @@ public class CompactionStatusTracker
private final ConcurrentHashMap<String, CompactionCandidate> submittedTaskIdToSegments
= new ConcurrentHashMap<>();

private final AtomicReference<DateTime> segmentSnapshotTime = new AtomicReference<>();

@Inject
public CompactionStatusTracker(ObjectMapper objectMapper)
{
Expand Down Expand Up @@ -109,6 +112,16 @@ public CompactionStatus computeCompactionStatus(
return CompactionStatus.skipped("Task for interval is already running");
}

// Skip intervals that have been recently compacted if segment timeline is not updated yet
final DateTime snapshotTime = segmentSnapshotTime.get();
if (lastTaskStatus != null
&& lastTaskStatus.getState() == TaskState.SUCCESS
&& snapshotTime != null && snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) {
return CompactionStatus.skipped(
"Segment timeline not updated since last compaction task succeeded"
);
}

// Skip intervals that have been filtered out by the policy
if (!searchPolicy.isEligibleForCompaction(candidate, compactionStatus, lastTaskStatus)) {
return CompactionStatus.skipped("Rejected by search policy");
Expand All @@ -125,6 +138,11 @@ public void onCompactionStatusComputed(
// Nothing to do, used by simulator
}

public void onSegmentTimelineUpdated(DateTime snapshotTime)
{
this.segmentSnapshotTime.set(snapshotTime);
}

public void onCompactionConfigUpdated(DruidCompactionConfig compactionConfig)
{
final Set<String> compactionEnabledDatasources = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
Expand Down Expand Up @@ -333,8 +335,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon
return new CompactionRunSimulator(compactionStatusTracker, overlordClient).simulateRunWithConfig(
metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest),
metadataManager.segments()
.getSnapshotOfDataSourcesWithAllUsedSegments()
.getUsedSegmentsTimelinesPerDataSource(),
.getSnapshotOfDataSourcesWithAllUsedSegments(),
CompactionEngine.NATIVE
);
}
Expand Down Expand Up @@ -669,9 +670,23 @@ public void run()
}

if (metadataManager.isStarted() && serverInventoryView.isStarted()) {
final DataSourcesSnapshot dataSourcesSnapshot;
if (dutyGroup.getName().equals(COMPACT_SEGMENTS_DUTIES_DUTY_GROUP)) {
// If this is a compact segments duty group triggered by IT,
// use a future snapshotTime to ensure that compaction always runs
dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
metadataManager.segments()
.getSnapshotOfDataSourcesWithAllUsedSegments()
.iterateAllUsedSegmentsInSnapshot(),
DateTimes.nowUtc().plusMinutes(60)
);
} else {
dataSourcesSnapshot = metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments();
}

final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.builder()
.withDataSourcesSnapshot(metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments())
.withDataSourcesSnapshot(dataSourcesSnapshot)
.withDynamicConfigs(metadataManager.configs().getCurrentDynamicConfig())
.withCompactionConfig(metadataManager.configs().getCurrentCompactionConfig())
.build();
Expand Down
Loading

0 comments on commit 3f797c5

Please sign in to comment.