Skip to content

Commit

Permalink
Spark upsert table backfill support (apache#14443)
Browse files Browse the repository at this point in the history
  • Loading branch information
pengding-stripe authored Nov 18, 2024
1 parent 179be30 commit 776e815
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private SegmentGenerationUtils() {
}

private static final String OFFLINE = "OFFLINE";
private static final String REALTIME = "REALTIME";
public static final String PINOT_PLUGINS_TAR_GZ = "pinot-plugins.tar.gz";
public static final String PINOT_PLUGINS_DIR = "pinot-plugins-dir";

Expand Down Expand Up @@ -139,6 +140,8 @@ public static TableConfig getTableConfig(String tableConfigURIStr, String authTo
}
if (tableJsonNode.has(OFFLINE)) {
tableJsonNode = tableJsonNode.get(OFFLINE);
} else if (tableJsonNode.has(REALTIME)) {
tableJsonNode = tableJsonNode.get(REALTIME);
}
try {
return JsonUtils.jsonNodeToObject(tableJsonNode, TableConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,29 @@ private SegmentNameGenerator getSegmentNameGenerator(SegmentGeneratorConfig segm
return new InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN),
segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), inputFileUri, appendUUIDToSegmentName);
case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != null,
"Creation time must be set for uploaded realtime segment name generator");
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() != -1,
Preconditions.checkState(segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_PARTITION_ID) != null,
"Valid partition id must be set for uploaded realtime segment name generator");
long creationTime;
String uploadTimeString = segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS);
if (uploadTimeString == null) {
uploadTimeString = segmentGeneratorConfig.getCreationTime();
}
Preconditions.checkState(uploadTimeString != null,
"Upload time must be set for uploaded realtime segment name generator");
long uploadTime;
try {
uploadTime = Long.parseLong(uploadTimeString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Upload time must be a valid long value in segmentNameGeneratorSpec");
}
int partitionId;
try {
creationTime = Long.parseLong(segmentGeneratorConfig.getCreationTime());
partitionId = Integer.parseInt(segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_PARTITION_ID));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Creation time must be a valid long value in segmentGeneratorConfig");
throw new IllegalArgumentException("Partition Id must be a valid integer value in segmentNameGeneratorSpec");
}
return new UploadedRealtimeSegmentNameGenerator(tableName,
segmentGeneratorConfig.getUploadedSegmentPartitionId(), creationTime,
segmentGeneratorConfig.getSegmentNamePrefix(), segmentGeneratorConfig.getSegmentNamePostfix());
return new UploadedRealtimeSegmentNameGenerator(tableName, partitionId, uploadTime,
segmentNameGeneratorConfigs.get(SEGMENT_NAME_PREFIX),
segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX));
default:
throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
}
Expand Down

0 comments on commit 776e815

Please sign in to comment.