From 776e8153815667bf3cfd98fabbdb074dc473e661 Mon Sep 17 00:00:00 2001 From: pengding-stripe <147551894+pengding-stripe@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:58:25 -0800 Subject: [PATCH] Spark upsert table backfill support (#14443) --- .../generation/SegmentGenerationUtils.java | 3 ++ .../common/SegmentGenerationTaskRunner.java | 28 +++++++++++++------ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java index 0e6f0234fbe0..981bd4d464d8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java @@ -48,6 +48,7 @@ private SegmentGenerationUtils() { } private static final String OFFLINE = "OFFLINE"; + private static final String REALTIME = "REALTIME"; public static final String PINOT_PLUGINS_TAR_GZ = "pinot-plugins.tar.gz"; public static final String PINOT_PLUGINS_DIR = "pinot-plugins-dir"; @@ -139,6 +140,8 @@ public static TableConfig getTableConfig(String tableConfigURIStr, String authTo } if (tableJsonNode.has(OFFLINE)) { tableJsonNode = tableJsonNode.get(OFFLINE); + } else if (tableJsonNode.has(REALTIME)) { + tableJsonNode = tableJsonNode.get(REALTIME); } try { return JsonUtils.jsonNodeToObject(tableJsonNode, TableConfig.class); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java index 37ff9bc47c2d..c18a8e3ce77e 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java @@ -167,19 +167,29 @@ private SegmentNameGenerator getSegmentNameGenerator(SegmentGeneratorConfig segm return new InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN), segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), inputFileUri, appendUUIDToSegmentName); case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME: - Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != null, - "Creation time must be set for uploaded realtime segment name generator"); - Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() != -1, + Preconditions.checkState(segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_PARTITION_ID) != null, "Valid partition id must be set for uploaded realtime segment name generator"); - long creationTime; + String uploadTimeString = segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS); + if (uploadTimeString == null) { + uploadTimeString = segmentGeneratorConfig.getCreationTime(); + } + Preconditions.checkState(uploadTimeString != null, + "Upload time must be set for uploaded realtime segment name generator"); + long uploadTime; + try { + uploadTime = Long.parseLong(uploadTimeString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Upload time must be a valid long value in segmentNameGeneratorSpec"); + } + int partitionId; try { - creationTime = Long.parseLong(segmentGeneratorConfig.getCreationTime()); + partitionId = Integer.parseInt(segmentNameGeneratorConfigs.get(BatchConfigProperties.SEGMENT_PARTITION_ID)); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Creation time must be a valid long value in segmentGeneratorConfig"); + throw new IllegalArgumentException("Partition Id must be a valid integer value in segmentNameGeneratorSpec"); } - return new UploadedRealtimeSegmentNameGenerator(tableName, - segmentGeneratorConfig.getUploadedSegmentPartitionId(), creationTime, - segmentGeneratorConfig.getSegmentNamePrefix(), segmentGeneratorConfig.getSegmentNamePostfix()); + return new UploadedRealtimeSegmentNameGenerator(tableName, partitionId, uploadTime, + segmentNameGeneratorConfigs.get(SEGMENT_NAME_PREFIX), + segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX)); default: throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType); }