Skip to content

Commit

Permalink
Using local copy of segment instead of downloading from remote (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
rajagopr authored and davecromberge committed Nov 22, 2024
1 parent 4855262 commit eb1fe53
Showing 1 changed file with 14 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
Expand All @@ -79,6 +80,7 @@
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";
private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();

private static final int DEFUALT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
Expand Down Expand Up @@ -284,14 +286,11 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
String.format("Uploading segment: %s (%d out of %d)", resultSegmentName, (i + 1), numOutputSegments));
String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
URI outputSegmentTarURI;
if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
!= BatchConfigProperties.SegmentPushType.TAR) {
outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile);
URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
outputSegmentTarURI);
} else {
outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}

// Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
Expand All @@ -316,11 +315,12 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
}

if (batchSegmentUpload) {
updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult,
updateSegmentUriToTarPathMap(taskConfigs, convertedTarredSegmentFile.toURI(), segmentConversionResult,
segmentUriToTarPathMap, pushJobSpec);
} else {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
pushSegment(rawTableName, taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult);
pushSegment(rawTableName, taskConfigs, convertedTarredSegmentFile.toURI(), httpHeaders, parameters,
segmentConversionResult);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
}
Expand Down Expand Up @@ -438,18 +438,15 @@ private void pushSegments(String tableNameWithType, Map<String, String> taskConf
List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults);
List<NameValuePair> parameters = getSegmentPushCommonParams(tableNameWithType);

URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
}
SegmentPushUtils.sendSegmentsUriAndMetadata(spec, LOCAL_PINOT_FS, segmentUriToTarPathMap, headers, parameters);
}

private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
private void pushSegment(String tableName, Map<String, String> taskConfigs, URI localSegmentTarURI,
List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult)
throws Exception {
String pushMode =
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name());
LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, localSegmentTarURI);

PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
Expand All @@ -462,7 +459,7 @@ private void pushSegment(String tableName, Map<String, String> taskConfigs, URI

switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
case TAR:
File tarFile = new File(outputSegmentTarURI);
File tarFile = new File(localSegmentTarURI);
String segmentName = segmentConversionResult.getSegmentName();
String tableNameWithType = segmentConversionResult.getTableNameWithType();
String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
Expand All @@ -472,12 +469,10 @@ private void pushSegment(String tableName, Map<String, String> taskConfigs, URI
case METADATA:
if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
Map<String, String> segmentUriToTarPathMap =
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{outputSegmentTarURI.toString()});
SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
}
Map<String, String> segmentUriToTarPathMap =
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{localSegmentTarURI.toString()});
SegmentPushUtils.sendSegmentUriAndMetadata(spec, LOCAL_PINOT_FS, segmentUriToTarPathMap, headers, parameters);
} else {
throw new RuntimeException("Output dir URI missing for metadata push");
}
Expand Down

0 comments on commit eb1fe53

Please sign in to comment.