Skip to content

Commit

Permalink
Fail MSQ compaction if multi-valued partition dimensions are found (a…
Browse files Browse the repository at this point in the history
…pache#17344)

MSQ currently supports only single-valued string dimensions as partition keys.
This patch adds a check to ensure that partition keys are single-valued in case
this info is available by virtue of segment download for schema inference.

During compaction, if MSQ finds multi-valued dimensions (MVDs) declared as part
of `range` partitionsSpec, it switches partitioning type to dynamic, ending up in
repeated compactions of the same interval. To avoid this scenario, the segment
download logic is also updated to always download segments if info on multi-valued
dimensions is required.
  • Loading branch information
gargvishesh authored Oct 19, 2024
1 parent 9a16d4e commit 5da9949
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand Down Expand Up @@ -84,6 +85,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +132,7 @@ public MSQCompactionRunner(
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>'range' partitionsSpec with multi-valued or non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
Expand All @@ -153,13 +155,24 @@ public CompactionConfigValidationResult validateCompactionTask(
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
DataSchema dataSchema = Iterables.getOnlyElement(intervalToDataSchemaMap.values());
if (compactionTask.getTuningConfig() != null) {
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
dataSchema.getDimensionsSpec().getDimensions()
)
);
validationResults.add(
validatePartitionDimensionsAreNotMultiValued(
compactionTask.getTuningConfig().getPartitionsSpec(),
dataSchema.getDimensionsSpec(),
dataSchema instanceof CombinedDataSchema
? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
: null
)
);

}
if (compactionTask.getGranularitySpec() != null) {
validationResults.add(ClientCompactionRunnerInfo.validateRollupForMSQ(
Expand All @@ -175,6 +188,32 @@ public CompactionConfigValidationResult validateCompactionTask(
.orElse(CompactionConfigValidationResult.success());
}

private CompactionConfigValidationResult validatePartitionDimensionsAreNotMultiValued(
PartitionsSpec partitionsSpec,
DimensionsSpec dimensionsSpec,
Set<String> multiValuedDimensions
)
{
List<String> dimensionSchemas = dimensionsSpec.getDimensionNames();
if (partitionsSpec instanceof DimensionRangePartitionsSpec
&& dimensionSchemas != null
&& multiValuedDimensions != null
&& !multiValuedDimensions.isEmpty()) {
Optional<String> multiValuedDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
.getPartitionDimensions()
.stream()
.filter(multiValuedDimensions::contains)
.findAny();
if (multiValuedDimension.isPresent()) {
return CompactionConfigValidationResult.failure(
"MSQ: Multi-valued string partition dimension[%s] not supported with 'range' partition spec",
multiValuedDimension.get()
);
}
}
return CompactionConfigValidationResult.success();
}

@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
Expand Down Expand Up @@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
null,
null,
null,
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
Expand All @@ -139,6 +144,7 @@ public void testMultipleDisjointCompactionIntervalsAreInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -160,6 +166,7 @@ public void testHashedPartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -173,6 +180,7 @@ public void testStringDimensionInRangePartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -187,6 +195,7 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);

Expand All @@ -200,6 +209,29 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
);
}

@Test
public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
{
List<String> mvStringPartitionDimension = Collections.singletonList(MV_STRING_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, mvStringPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null,
null
);

CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Multi-valued string partition dimension[mv_string_dim] not supported with 'range' partition spec",
validationResult.getReason()
);
}

@Test
public void testMaxTotalRowsIsInvalid()
{
Expand All @@ -208,6 +240,7 @@ public void testMaxTotalRowsIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -221,6 +254,7 @@ public void testDynamicPartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -234,6 +268,7 @@ public void testQueryGranularityAllIsValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -247,26 +282,28 @@ public void testRollupFalseWithMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, false),
null,
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testRollupTrueWithoutMetricsSpecIsInValid()
public void testRollupTrueWithoutMetricsSpecIsInvalid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
Expand All @@ -276,6 +313,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -292,7 +330,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
@Test
public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
{
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null);
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null, null);
TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.emptyMap(), null);
Assert.assertTrue(taskStatus.isFailure());
}
Expand All @@ -307,6 +345,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -384,6 +423,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -481,6 +521,7 @@ private CompactionTask createCompactionTask(
@Nullable DimFilter dimFilter,
Map<String, Object> contextParams,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable List<DimensionSchema> dimensionSchemas,
@Nullable AggregatorFactory[] metricsSpec
)
{
Expand All @@ -504,6 +545,7 @@ private CompactionTask createCompactionTask(
))
.transformSpec(transformSpec)
.granularitySpec(granularitySpec)
.dimensionsSpec(new DimensionsSpec(dimensionSchemas))
.metricsSpec(metricsSpec)
.compactionRunner(MSQ_COMPACTION_RUNNER)
.context(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
Expand Down Expand Up @@ -80,6 +81,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -452,6 +454,50 @@ public boolean isPerfectRollup()
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}

/**
* Checks if multi-valued string dimensions need to be analyzed by downloading the segments.
* This method returns true only for MSQ engine when either of the following holds true:
* <ul>
* <li> Range partitioning is done on a string dimension or an unknown dimension
* (since MSQ does not support partitioning on a multi-valued string dimension) </li>
* <li> Rollup is done on a string dimension or an unknown dimension
* (since MSQ requires multi-valued string dimensions to be converted to arrays for rollup) </li>
* </ul>
* @return false for native engine, true for MSQ engine only when partitioning or rollup is done on a string
* or unknown dimension.
*/
boolean identifyMultiValuedDimensions()
{
if (compactionRunner instanceof NativeCompactionRunner) {
return false;
}
// Rollup can be true even when granularitySpec is not known since rollup is then decided based on segment analysis
final boolean isPossiblyRollup = granularitySpec == null || !Boolean.FALSE.equals(granularitySpec.isRollup());
boolean isRangePartitioned = tuningConfig != null
&& tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;

if (dimensionsSpec == null || dimensionsSpec.getDimensions().isEmpty()) {
return isPossiblyRollup || isRangePartitioned;
} else {
boolean isRollupOnStringDimension = isPossiblyRollup &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(dim -> ColumnType.STRING.equals(dim.getColumnType()));

boolean isPartitionedOnStringDimension =
isRangePartitioned &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(
dim -> ColumnType.STRING.equals(dim.getColumnType())
&& ((DimensionRangePartitionsSpec) tuningConfig.getPartitionsSpec())
.getPartitionDimensions()
.contains(dim.getName())
);
return isRollupOnStringDimension || isPartitionedOnStringDimension;
}
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -466,7 +512,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
metricsSpec,
granularitySpec,
getMetricBuilder(),
!(compactionRunner instanceof NativeCompactionRunner)
this.identifyMultiValuedDimensions()
);

registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
Expand Down Expand Up @@ -794,23 +840,25 @@ static class ExistingSegmentAnalyzer
this.needMultiValuedDimensions = needMultiValuedDimensions;
}

private boolean shouldFetchSegments()
/**
* Segments are downloaded even when just needMultiValuedDimensions=true since MSQ switches to dynamic partitioning
* on finding any 'range' partition dimension to be multivalued at runtime, which ends up in a mismatch between
* the compaction config and the actual segments (lastCompactionState), leading to repeated compactions.
*/
private boolean shouldDownloadSegments()
{
// Don't fetch segments for just needMultiValueDimensions
return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec;

return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec || needMultiValuedDimensions;
}

public void fetchAndProcessIfNeeded()
{
if (!shouldFetchSegments()) {
if (!shouldDownloadSegments()) {
// Nothing to do; short-circuit and don't fetch segments.
return;
}

if (needMultiValuedDimensions) {
multiValuedDimensions = new HashSet<>();
}

multiValuedDimensions = new HashSet<>();
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments = sortSegmentsListNewestFirst();

for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> segmentPair : segments) {
Expand Down
Loading

0 comments on commit 5da9949

Please sign in to comment.