Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ public static boolean isInferInvalidSegmentPartition(Map<String, String> queryOp
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_INVALID_SEGMENT_PARTITION, "false"));
}

public static boolean isInferRealtimeSegmentPartition(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_REALTIME_SEGMENT_PARTITION, "false"));
}

@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) {
if (optionValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long requ
routingTableMap.keySet());
boolean inferInvalidPartitionSegment = QueryOptionsUtils.isInferInvalidSegmentPartition(
_physicalPlannerContext.getQueryOptions());
boolean inferRealtimeSegmentPartition = QueryOptionsUtils.isInferRealtimeSegmentPartition(
_physicalPlannerContext.getQueryOptions());
TableScanWorkerAssignmentResult workerAssignmentResult = assignTableScan(tableName, fieldNames,
instanceIdToSegments, tablePartitionInfoMap, inferInvalidPartitionSegment);
instanceIdToSegments, tablePartitionInfoMap, inferInvalidPartitionSegment, inferRealtimeSegmentPartition);
TableScanMetadata metadata = new TableScanMetadata(Set.of(tableName), workerAssignmentResult._workerIdToSegmentsMap,
tableOptions, segmentUnavailableMap, timeBoundaryInfo);
return tableScan.with(workerAssignmentResult._pinotDataDistribution, metadata);
Expand All @@ -196,7 +198,7 @@ private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long requ
@VisibleForTesting
static TableScanWorkerAssignmentResult assignTableScan(String tableName, List<String> fieldNames,
InstanceIdToSegments instanceIdToSegments, Map<String, TablePartitionInfo> tpiMap,
boolean inferInvalidPartitionSegment) {
boolean inferInvalidPartitionSegment, boolean inferRealtimeSegmentPartition) {
Set<String> tableTypes = instanceIdToSegments.getActiveTableTypes();
Set<String> partitionedTableTypes = tableTypes.stream().filter(tpiMap::containsKey).collect(Collectors.toSet());
Preconditions.checkState(!tableTypes.isEmpty(), "No routing entry for offline or realtime type");
Expand All @@ -209,7 +211,7 @@ static TableScanWorkerAssignmentResult assignTableScan(String tableName, List<St
TablePartitionInfo tpi = tpiMap.get(tableType);
TableScanWorkerAssignmentResult assignmentResult = attemptPartitionedDistribution(tableNameWithType,
fieldNames, instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), tpi,
inferInvalidPartitionSegment);
inferInvalidPartitionSegment, inferRealtimeSegmentPartition);
if (tpi != null && CollectionUtils.isNotEmpty(tpi.getSegmentsWithInvalidPartition())) {
// Use SegmentPartitionMetadataManager's logs to find the segments with invalid partitions.
BROKER_METRICS.addMeteredTableValue(tableNameWithType, BrokerMeter.INVALID_SEGMENT_PARTITION_IN_QUERY,
Expand Down Expand Up @@ -259,13 +261,22 @@ static TableScanWorkerAssignmentResult assignTableScan(String tableName, List<St
@VisibleForTesting
static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String tableNameWithType,
List<String> fieldNames, Map<String, List<String>> instanceIdToSegmentsMap,
@Nullable TablePartitionInfo tablePartitionInfo, boolean inferInvalidSegmentPartition) {
@Nullable TablePartitionInfo tablePartitionInfo, boolean inferInvalidSegmentPartition,
boolean inferRealtimeSegmentPartition) {
if (tablePartitionInfo == null) {
return null;
}
String tableType =
Objects.requireNonNull(TableNameBuilder.getTableTypeFromTableName(tableNameWithType),
"Illegal state: expected table name with type").toString();
if (TableType.valueOf(tableType) == TableType.REALTIME && inferRealtimeSegmentPartition) {
// If we are inferring partitioning for realtime segments, we need to build the TPI with inferred partitions.
tablePartitionInfo = buildTablePartitionInfoWithInferredPartitions(tableNameWithType, instanceIdToSegmentsMap,
tablePartitionInfo);
if (tablePartitionInfo == null) {
return null;
}
}
int numPartitions = tablePartitionInfo.getNumPartitions();
int keyIndex = fieldNames.indexOf(tablePartitionInfo.getPartitionColumn());
String function = tablePartitionInfo.getPartitionFunctionName();
Expand Down Expand Up @@ -357,6 +368,36 @@ static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String tab
return new TableScanWorkerAssignmentResult(dataDistribution, workerIdToSegmentsMap);
}

@VisibleForTesting
@Nullable
static TablePartitionInfo buildTablePartitionInfoWithInferredPartitions(String tableNameWithType,
Map<String, List<String>> instanceIdToSegmentsMap, TablePartitionInfo tablePartitionInfo) {
Preconditions.checkState(TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)),
"Table %s is not a realtime table. Cannot infer partitions for invalid segments", tableNameWithType);
String partitionColumn = tablePartitionInfo.getPartitionColumn();
String partitionFunctionName = tablePartitionInfo.getPartitionFunctionName();
int numPartitions = tablePartitionInfo.getNumPartitions();
List<String> segmentsWithInvalidPartition = List.of();
// compute segmentsByPartition by getting segments from the map and inferring partition from segment name.
List<List<String>> segmentsByPartition = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
segmentsByPartition.add(new ArrayList<>());
}
for (Map.Entry<String, List<String>> entry : instanceIdToSegmentsMap.entrySet()) {
List<String> segments = entry.getValue();
for (String segment : segments) {
int partitionId = inferPartitionId(segment, numPartitions);
if (partitionId == -1) {
return null;
} else {
segmentsByPartition.get(partitionId).add(segment);
}
}
}
return new TablePartitionInfo(tableNameWithType, partitionColumn, partitionFunctionName, numPartitions,
segmentsByPartition, segmentsWithInvalidPartition);
}

/**
* Infers partition from invalid segments if the passed flag is set to true. Inference is done by simply:
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
Expand Down Expand Up @@ -74,7 +76,7 @@ public class LeafStageWorkerAssignmentRuleTest {
@Test
public void testAssignTableScanWithUnPartitionedOfflineTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of(), false, false);
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY);
Expand All @@ -85,7 +87,7 @@ public void testAssignTableScanWithUnPartitionedOfflineTable() {
@Test
public void testAssignTableScanWithUnPartitionedRealtimeTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of(), false, false);
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY);
Expand All @@ -96,7 +98,7 @@ public void testAssignTableScanWithUnPartitionedRealtimeTable() {
@Test
public void testAssignTableScanWithUnPartitionedHybridTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of(), false);
HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of(), false, false);
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY);
Expand All @@ -108,7 +110,7 @@ public void testAssignTableScanWithUnPartitionedHybridTable() {
@Test
public void testAssignTableScanPartitionedOfflineTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo()), false);
OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo()), false, false);
// Basic checks.
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
Expand All @@ -124,7 +126,7 @@ public void testAssignTableScanPartitionedOfflineTable() {
@Test
public void testAssignTableScanPartitionedRealtimeTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", createRealtimeTablePartitionInfo()), false);
REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME", createRealtimeTablePartitionInfo()), false, false);
// Basic checks.
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
Expand All @@ -146,7 +148,7 @@ public void testAssignTableScanPartitionedRealtimeTableWithSomeInvalidPartitionS
// Case-1: When inferInvalidPartitionSegment is set to true.
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, Map.of("REALTIME",
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), true);
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), true, false);
// Basic checks.
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.HASH_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
Expand All @@ -163,7 +165,7 @@ public void testAssignTableScanPartitionedRealtimeTableWithSomeInvalidPartitionS
// Case-2: When inferInvalidPartitionSegment is set to false.
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS, Map.of("REALTIME",
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), false);
createRealtimeTablePartitionInfo(List.of(INVALID_SEGMENT_PARTITION))), false, false);
// Basic checks.
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
Expand All @@ -178,7 +180,7 @@ public void testAssignTableScanPartitionedRealtimeTableWithSomeInvalidPartitionS
public void testAssignTableScanPartitionedHybridTable() {
TableScanWorkerAssignmentResult result = LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE", createOfflineTablePartitionInfo(),
"REALTIME", createRealtimeTablePartitionInfo()), false);
"REALTIME", createRealtimeTablePartitionInfo()), false, false);
assertEquals(result._pinotDataDistribution.getType(), RelDistribution.Type.RANDOM_DISTRIBUTED);
assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
assertEquals(result._pinotDataDistribution.getCollation(), RelCollations.EMPTY);
Expand All @@ -187,6 +189,50 @@ public void testAssignTableScanPartitionedHybridTable() {
validateTableScanAssignment(result, HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
}

@Test
public void testBuildTablePartitionInfoWithInferredPartitionsWhenRealtimeSegments() {
final LLCSegmentName segment1 = new LLCSegmentName(TABLE_NAME, 3, 35,
System.currentTimeMillis() / 1000);
final LLCSegmentName segment2 = new LLCSegmentName(TABLE_NAME, 2, 35,
System.currentTimeMillis() / 1000);
final UploadedRealtimeSegmentName segment3 = new UploadedRealtimeSegmentName(
String.format("uploaded__%s__0__20240530T0000Z__suffix", TABLE_NAME));
final List<String> segments = List.of(segment1.getSegmentName(), segment2.getSegmentName(),
segment3.getSegmentName());
final String tableNameWithType = "foobar_REALTIME";
final int numPartitions = 4;
// Create input table partition info and set all segments to invalid partition. Simulating case when the
// segmentPartitionConfig has just been added to a Realtime table.
final TablePartitionInfo inputTablePartitionInfo = new TablePartitionInfo(tableNameWithType, PARTITION_COLUMN,
PARTITION_FUNCTION, numPartitions, List.of(), segments /* invalid partition segments */);
// Setup other test inputs.
Map<String, List<String>> instanceIdToSegmentsMap = new HashMap<>();
instanceIdToSegmentsMap.put("instance-0", segments);
TablePartitionInfo tpi = LeafStageWorkerAssignmentRule.buildTablePartitionInfoWithInferredPartitions(
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), instanceIdToSegmentsMap, inputTablePartitionInfo);
assertNotNull(tpi);
assertEquals(tpi.getNumPartitions(), numPartitions);
assertEquals(tpi.getSegmentsWithInvalidPartition(), List.of());
assertEquals(tpi.getSegmentsByPartition().get(0), List.of(segment3.getSegmentName()));
assertEquals(tpi.getSegmentsByPartition().get(1), List.of());
assertEquals(tpi.getSegmentsByPartition().get(2), List.of(segment2.getSegmentName()));
assertEquals(tpi.getSegmentsByPartition().get(3), List.of(segment1.getSegmentName()));
}

@Test
public void testBuildTablePartitionInfoWithInferredPartitionsWhenInvalidRealtimeSegmentName() {
final List<String> segments = List.of("foobar_101_35__20250509T1Z");
final String tableNameWithType = "foobar_REALTIME";
final int numPartitions = 4;
final TablePartitionInfo inputTablePartitionInfo = new TablePartitionInfo(tableNameWithType, PARTITION_COLUMN,
PARTITION_FUNCTION, numPartitions, List.of(), segments /* invalid partition segments */);
// Setup other test inputs.
Map<String, List<String>> instanceIdToSegmentsMap = new HashMap<>();
instanceIdToSegmentsMap.put("instance-0", segments);
assertNull(LeafStageWorkerAssignmentRule.buildTablePartitionInfoWithInferredPartitions(
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), instanceIdToSegmentsMap, inputTablePartitionInfo));
}

@Test
public void testGetInvalidSegmentsByInferredPartitionWhenSegmentNamesDontConform() {
final int numPartitions = 4; // arbitrary for this test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,16 @@ public static class QueryOptionKey {
// records to stream partitions, which can make a segment have multiple partitions. The scale of this is
// usually low, and this query option allows the MSE Optimizer to infer the partition of a segment based on its
// name, when that segment has multiple partitions in its columnPartitionMap.
@Deprecated
public static final String INFER_INVALID_SEGMENT_PARTITION = "inferInvalidSegmentPartition";
// For realtime tables, this infers the segment partition for all segments. The partition column, function,
// and number of partitions still rely on the Table's segmentPartitionConfig. This is useful if you have
// scenarios where the stream doesn't guarantee 100% accuracy for stream partition assignment. In such
// scenarios, if you don't have upsert compaction enabled, inferInvalidSegmentPartition will suffice. But when
// you have compaction enabled, it's possible that after compaction you are only left with invalid partition
// records, which can change the partition of a segment from something like [1, 3, 5] to [5], for a segment
// that was supposed to be in partition-1.
public static final String INFER_REALTIME_SEGMENT_PARTITION = "inferRealtimeSegmentPartition";
public static final String USE_LITE_MODE = "useLiteMode";
// Used by the MSE Engine to determine whether to use the broker pruning logic. Only supported by the
// new MSE query optimizer.
Expand Down
Loading