diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 417fdb60d0f3..e20188d58294 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -25,6 +25,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
@@ -129,21 +130,35 @@ public MSQCompactionRunner(
* The following configs aren't supported:
*
* - partitionsSpec of type HashedParititionsSpec.
+ * - 'range' partitionsSpec with non-string partition dimensions.
* - maxTotalRows in DynamicPartitionsSpec.
- * - rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
- * Null is treated as true if metricsSpec exist and false if empty.
- * - any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
+ * - Rollup without metricsSpec being specified or vice-versa.
+ * - Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.
+ * - Multiple disjoint intervals in compaction task
*
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask
+ CompactionTask compactionTask,
+ Map intervalToDataSchemaMap
)
{
+ if (intervalToDataSchemaMap.size() > 1) {
+ // We are currently not able to handle multiple intervals in the map for multiple reasons, one of them being that
+ // the subsequent worker ids clash -- since they are derived from MSQControllerTask ID which in turn is equal to
+ // CompactionTask ID for each sequentially launched MSQControllerTask.
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Disjoint compaction intervals[%s] not supported",
+ intervalToDataSchemaMap.keySet()
+ );
+ }
List validationResults = new ArrayList<>();
if (compactionTask.getTuningConfig() != null) {
- validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
- compactionTask.getTuningConfig().getPartitionsSpec())
+ validationResults.add(
+ ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
+ compactionTask.getTuningConfig().getPartitionsSpec(),
+ Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
+ )
);
}
if (compactionTask.getGranularitySpec() != null) {
@@ -300,7 +315,7 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
}
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
- rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
+ rowSignatureBuilder.add(dimensionSchema.getName(), dimensionSchema.getColumnType());
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
@@ -416,7 +431,9 @@ private static boolean isGroupBy(DataSchema dataSchema)
{
if (dataSchema.getGranularitySpec() != null) {
// If rollup is true without any metrics, all columns are treated as dimensions and
- // duplicate rows are removed in line with native compaction.
+ // duplicate rows are removed in line with native compaction. This case can only happen if the rollup is
+ // specified as null in the compaction spec and is then inferred to be true by segment analysis. metrics=null and
+ // rollup=true combination in turn can only have been recorded for natively ingested segments.
return dataSchema.getGranularitySpec().isRollup();
}
// If no rollup specified, decide based on whether metrics are present.
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 15b12be15753..0b5395d727fe 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@@ -41,6 +42,7 @@
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest
private static final int MAX_ROWS_PER_SEGMENT = 150000;
private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR;
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
- private static List PARTITION_DIMENSIONS;
private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false);
private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null);
@@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest
LONG_DIMENSION,
MV_STRING_DIMENSION
);
+ private static final Map INTERVAL_DATASCHEMAS = ImmutableMap.of(
+ COMPACTION_INTERVAL,
+ new DataSchema.Builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+ .withDimensions(new DimensionsSpec(DIMENSIONS))
+ .build()
+ );
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added");
private static final List AGGREGATORS = ImmutableList.of(AGG1, AGG2);
private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null);
+ private static final List PARTITION_DIMENSIONS = Collections.singletonList(STRING_DIMENSION.getName());
+
@BeforeClass
public static void setupClass()
{
NullHandling.initializeForTests();
+ }
- final StringDimensionSchema stringDimensionSchema = new StringDimensionSchema(
- "string_dim",
+ @Test
+ public void testMultipleDisjointCompactionIntervalsAreInvalid()
+ {
+ Map intervalDataschemas = new HashMap<>(INTERVAL_DATASCHEMAS);
+ intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null);
+ CompactionTask compactionTask = createCompactionTask(
+ new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")),
+ null,
+ Collections.emptyMap(),
null,
null
);
-
- PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName());
+ CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
+ compactionTask,
+ intervalDataschemas
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format("MSQ: Disjoint compaction intervals[%s] not supported", intervalDataschemas.keySet()),
+ validationResult.getReason()
+ );
}
@Test
@@ -136,11 +162,11 @@ public void testHashedPartitionsSpecIsInvalid()
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
- public void testDimensionRangePartitionsSpecIsValid()
+ public void testStringDimensionInRangePartitionsSpecIsValid()
{
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false),
@@ -149,7 +175,29 @@ public void testDimensionRangePartitionsSpecIsValid()
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
+ }
+
+ @Test
+ public void testLongDimensionInRangePartitionsSpecIsInvalid()
+ {
+ List longPartitionDimension = Collections.singletonList(LONG_DIMENSION.getName());
+ CompactionTask compactionTask = createCompactionTask(
+ new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, longPartitionDimension, false),
+ null,
+ Collections.emptyMap(),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
+ INTERVAL_DATASCHEMAS
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-string partition dimension[long_dim] of type[long] not supported with 'range' partition spec",
+ validationResult.getReason()
+ );
}
@Test
@@ -162,7 +210,7 @@ public void testMaxTotalRowsIsInvalid()
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -175,7 +223,7 @@ public void testDynamicPartitionsSpecIsValid()
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -188,7 +236,7 @@ public void testQueryGranularityAllIsValid()
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -201,7 +249,7 @@ public void testRollupFalseWithMetricsSpecIsInValid()
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -214,7 +262,7 @@ public void testRollupTrueWithoutMetricsSpecIsInValid()
new ClientCompactionTaskGranularitySpec(null, null, true),
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -227,13 +275,16 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
- new ClientCompactionTaskGranularitySpec(null, null, null),
+ new ClientCompactionTaskGranularitySpec(null, null, true),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
- CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+ CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
+ compactionTask,
+ INTERVAL_DATASCHEMAS
+ );
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
+ "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 8d30a60d04e6..0abaeed8eb27 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,6 +57,9 @@ TaskStatus runCompactionTasks(
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
- CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
+ CompactionConfigValidationResult validateCompactionTask(
+ CompactionTask compactionTask,
+ Map intervalToDataSchemaMap
+ );
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index b3c01d79f98b..4594fc1e9b25 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -470,7 +470,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
- CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
+ CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(
+ this,
+ intervalDataSchemas
+ );
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 2074d14f0f90..541f24fe0889 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,7 +85,8 @@ public CurrentSubTaskHolder getCurrentSubTaskHolder()
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask
+ CompactionTask compactionTask,
+ Map intervalDataSchemaMap
)
{
return CompactionConfigValidationResult.success();
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 806b35e94819..f6a009afe1ce 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -21,12 +21,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -36,6 +38,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
@@ -102,16 +107,20 @@ public static CompactionConfigValidationResult validateCompactionConfig(
* Checks if the provided compaction config is supported by MSQ. The following configs aren't supported:
*
* - partitionsSpec of type HashedParititionsSpec.
+ * - 'range' partitionsSpec with non-string partition dimensions.
* - maxTotalRows in DynamicPartitionsSpec.
- * - rollup in granularitySpec set to false when metricsSpec is specified or true when it's empty.
- * - any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
+ * - Rollup without metricsSpec being specified or vice-versa.
+ * - Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.
*
*/
private static CompactionConfigValidationResult compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
{
List validationResults = new ArrayList<>();
if (newConfig.getTuningConfig() != null) {
- validationResults.add(validatePartitionsSpecForMSQ(newConfig.getTuningConfig().getPartitionsSpec()));
+ validationResults.add(validatePartitionsSpecForMSQ(
+ newConfig.getTuningConfig().getPartitionsSpec(),
+ newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions()
+ ));
}
if (newConfig.getGranularitySpec() != null) {
validationResults.add(validateRollupForMSQ(
@@ -128,9 +137,13 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn
}
/**
- * Validate that partitionSpec is either 'dynamic` or 'range', and if 'dynamic', ensure 'maxTotalRows' is null.
+ * Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', ensure 'maxTotalRows' is null. If range
+ * ensure all partition columns are of string type.
*/
- public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(PartitionsSpec partitionsSpec)
+ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
+ @Nullable PartitionsSpec partitionsSpec,
+ @Nullable List dimensionSchemas
+ )
{
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
@@ -146,11 +159,28 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(Part
"MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
);
}
+ if (partitionsSpec instanceof DimensionRangePartitionsSpec && dimensionSchemas != null) {
+ Map dimensionSchemaMap = dimensionSchemas.stream().collect(
+ Collectors.toMap(DimensionSchema::getName, Function.identity())
+ );
+ Optional nonStringDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
+ .getPartitionDimensions()
+ .stream()
+ .filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
+ .findAny();
+ if (nonStringDimension.isPresent()) {
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec",
+ nonStringDimension.get(),
+ dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
+ );
+ }
+ }
return CompactionConfigValidationResult.success();
}
/**
- * Validate rollup in granularitySpec is set to true when metricsSpec is specified and false if it's null.
+ * Validate rollup in granularitySpec is set to true iff metricsSpec is specified.
* If rollup set to null, all existing segments are analyzed, and it's set to true iff all segments have rollup
* set to true.
*/
@@ -159,13 +189,9 @@ public static CompactionConfigValidationResult validateRollupForMSQ(
@Nullable Boolean isRollup
)
{
- if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && !isRollup) {
- return CompactionConfigValidationResult.failure(
- "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified"
- );
- } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != null && isRollup) {
+ if ((metricsSpec != null && metricsSpec.length > 0) != Boolean.TRUE.equals(isRollup)) {
return CompactionConfigValidationResult.failure(
- "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null"
+ "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified"
);
}
return CompactionConfigValidationResult.success();
@@ -190,7 +216,7 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map
CompactionConfigValidationResult.failure(
- "MSQ: Non-idempotent aggregator[%s] not supported in 'metricsSpec'.",
+ "MSQ: Aggregator[%s] not supported in 'metricsSpec'",
aggregatorFactory.getName()
)
).orElse(CompactionConfigValidationResult.success());
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index 2bc6d251f06b..fd53ed38c257 100644
--- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -25,7 +25,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.config.Configs;
-import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -44,6 +44,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Represents the status of compaction for a given {@link CompactionCandidate}.
@@ -230,6 +231,21 @@ static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuni
}
}
+ private static List getNonPartitioningDimensions(
+ @Nullable final List dimensionSchemas,
+ @Nullable final PartitionsSpec partitionsSpec
+ )
+ {
+ if (dimensionSchemas == null || !(partitionsSpec instanceof DimensionRangePartitionsSpec)) {
+ return dimensionSchemas;
+ }
+
+ final List partitionsDimensions = ((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions();
+ return dimensionSchemas.stream()
+ .filter(dim -> !partitionsDimensions.contains(dim.getName()))
+ .collect(Collectors.toList());
+ }
+
/**
* Converts to have only the effective maxRowsPerSegment to avoid false positives when targetRowsPerSegment is set but
* effectively translates to the same maxRowsPerSegment.
@@ -389,18 +405,34 @@ private CompactionStatus queryGranularityIsUpToDate()
}
}
+ /**
+ * Removes partition dimensions before comparison, since they are placed in front of the sort order --
+ * which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately
+ * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
+ */
private CompactionStatus dimensionsSpecIsUpToDate()
{
if (compactionConfig.getDimensionsSpec() == null) {
return COMPLETE;
} else {
- final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
- return CompactionStatus.completeIfEqual(
- "dimensionsSpec",
+ List existingDimensions = getNonPartitioningDimensions(
+ lastCompactionState.getDimensionsSpec() == null
+ ? null
+ : lastCompactionState.getDimensionsSpec().getDimensions(),
+ lastCompactionState.getPartitionsSpec()
+ );
+ List configuredDimensions = getNonPartitioningDimensions(
compactionConfig.getDimensionsSpec().getDimensions(),
- existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions(),
- String::valueOf
+ compactionConfig.getTuningConfig() == null ? null : compactionConfig.getTuningConfig().getPartitionsSpec()
);
+ {
+ return CompactionStatus.completeIfEqual(
+ "dimensionsSpec",
+ configuredDimensions,
+ existingDimensions,
+ String::valueOf
+ );
+ }
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b347a57dcb6c..035286692bf5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -86,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
* Must be the same as org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
+ private static final String COMPACTION_REASON_KEY = "compactionReason";
private static final Logger LOG = new Logger(CompactSegments.class);
@@ -567,6 +568,10 @@ private int submitCompactionTasks(
slotsRequiredForCurrentTask = findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig());
}
+ if (entry.getCurrentStatus() != null) {
+ autoCompactionContext.put(COMPACTION_REASON_KEY, entry.getCurrentStatus().getReason());
+ }
+
final String taskId = compactSegments(
entry,
config.getTaskPriority(),
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index 011a4640da37..b1f065422805 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -21,6 +21,8 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -36,6 +38,7 @@
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
@@ -45,6 +48,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class ClientCompactionRunnerInfoTest
@@ -56,6 +60,7 @@ public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
+ null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -76,6 +81,7 @@ public void testMSQEngineWithMaxTotalRowsIsInvalid()
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
+ null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -96,6 +102,7 @@ public void testMSQEngineWithDynamicPartitionsSpecIsValid()
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
@@ -103,18 +110,40 @@ public void testMSQEngineWithDynamicPartitionsSpecIsValid()
}
@Test
- public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
+ public void testMSQEngineWithStringDimensionsInRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
.isValid());
}
+ @Test
+ public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
+ {
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+ new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
+ Collections.emptyMap(),
+ null,
+ null,
+ ImmutableList.of(new LongDimensionSchema("partitionDim"))
+ );
+ CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-string partition dimension[partitionDim] of type[long] not supported with 'range' partition spec",
+ validationResult.getReason()
+ );
+ }
+
@Test
public void testMSQEngineWithQueryGranularityAllIsValid()
{
@@ -122,6 +151,7 @@ public void testMSQEngineWithQueryGranularityAllIsValid()
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL, Granularities.ALL, false),
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
@@ -135,7 +165,8 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
- new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+ new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+ null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@@ -143,7 +174,7 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified",
+ "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
validationResult.getReason()
);
}
@@ -155,6 +186,7 @@ public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid()
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, true),
+ null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -163,7 +195,7 @@ public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid()
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
+ "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
validationResult.getReason()
);
}
@@ -177,8 +209,9 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
- new UserCompactionTaskGranularityConfig(null, null, null),
- new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
+ null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@@ -186,29 +219,38 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
+ "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
@Test
- public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
+ public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
- new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+ new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+ null
+ );
+ CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
+ validationResult.getReason()
);
- Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
- .isValid());
}
private static DataSourceCompactionConfig createMSQCompactionConfig(
PartitionsSpec partitionsSpec,
Map context,
@Nullable UserCompactionTaskGranularityConfig granularitySpec,
- @Nullable AggregatorFactory[] metricsSpec
+ @Nullable AggregatorFactory[] metricsSpec,
+ List dimensions
)
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@@ -219,7 +261,7 @@ private static DataSourceCompactionConfig createMSQCompactionConfig(
new Period(3600),
createTuningConfig(partitionsSpec),
granularitySpec,
- null,
+ new UserCompactionTaskDimensionsConfig(dimensions),
metricsSpec,
null,
null,
diff --git a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
index 7580582685b0..5659a0ff5bfc 100644
--- a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
@@ -28,6 +28,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -1137,6 +1138,82 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentDim
Assert.assertFalse(iterator.hasNext());
}
+ @Test
+ public void testIteratorDoesNotReturnsSegmentsWhenPartitionDimensionsPrefixed()
+ {
+ // Same indexSpec as what is set in the auto compaction config
+ Map indexSpec = IndexSpec.DEFAULT.asMap(mapper);
+ // Set range partitions spec with dimensions ["dim2", "dim4"] -- the same as what is set in the auto compaction config
+ PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
+ null,
+ Integer.MAX_VALUE,
+ ImmutableList.of("dim2", "dim4"),
+ false
+ );
+
+ // Create segments that were compacted (CompactionState != null) and have
+ // Dimensions=["dim2", "dim4", "dim3", "dim1"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+ // Dimensions=["dim2", "dim4", "dim1", "dim3"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+ final SegmentTimeline timeline = createTimeline(
+ createSegments()
+ .startingAt("2017-10-01")
+ .withNumPartitions(4)
+ .withCompactionState(
+ new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim3", "dim1"))), null, null, indexSpec, null)
+ ),
+ createSegments()
+ .startingAt("2017-10-02")
+ .withNumPartitions(4)
+ .withCompactionState(
+ new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim1", "dim3"))), null, null, indexSpec, null)
+ )
+ );
+
+ // Auto compaction config sets Dimensions=["dim1", "dim2", "dim3", "dim4"] and partition dimensions as ["dim2", "dim4"]
+ CompactionSegmentIterator iterator = createIterator(
+ configBuilder().withDimensionsSpec(
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "dim4")))
+ )
+ .withTuningConfig(
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ 1000L,
+ null,
+ partitionsSpec,
+ IndexSpec.DEFAULT,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+ .build(),
+ timeline
+ );
+ // We should get only interval 2017-10-01T00:00:00/2017-10-02T00:00:00 since 2017-10-02T00:00:00/2017-10-03T00:00:00
+ // has dimension order as expected post reordering of partition dimensions.
+ Assert.assertTrue(iterator.hasNext());
+ List expectedSegmentsToCompact = new ArrayList<>(
+ timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next().getSegments())
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
@Test
public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentFilter() throws Exception
{