Skip to content

Commit

Permalink
Revert "Using local copy of segment instead of downloading from remote (
Browse files Browse the repository at this point in the history
#12863)" (#13114)

This reverts commit af8fd40.
  • Loading branch information
swaminathanmanish authored May 9, 2024
1 parent 760e952 commit 2adb4d7
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
Expand All @@ -79,7 +78,6 @@
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";
private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();

private static final int DEFUALT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
Expand Down Expand Up @@ -287,11 +285,14 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig

String pushMode =
configs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name());
URI outputSegmentTarURI;
if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
!= BatchConfigProperties.SegmentPushType.TAR) {
URI outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile);
outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
outputSegmentTarURI);
} else {
outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}

List<Header> httpHeaders = new ArrayList<>();
Expand All @@ -315,7 +316,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
tableTypeParameter);

pushSegment(tableNameParameter.getValue(), configs, convertedTarredSegmentFile.toURI(), httpHeaders, parameters,
pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters,
segmentConversionResult);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
Expand All @@ -337,12 +338,12 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
}
}

private void pushSegment(String tableName, Map<String, String> taskConfigs, URI localSegmentTarURI,
private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult)
throws Exception {
String pushMode =
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name());
LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, localSegmentTarURI);
LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);

PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
Expand All @@ -355,7 +356,7 @@ private void pushSegment(String tableName, Map<String, String> taskConfigs, URI

switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
case TAR:
File tarFile = new File(localSegmentTarURI);
File tarFile = new File(outputSegmentTarURI);
String segmentName = segmentConversionResult.getSegmentName();
String tableNameWithType = segmentConversionResult.getTableNameWithType();
String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
Expand All @@ -365,11 +366,12 @@ private void pushSegment(String tableName, Map<String, String> taskConfigs, URI
case METADATA:
if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
Map<String, String> segmentUriToTarPathMap =
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{localSegmentTarURI.toString()});
// Use local FS to avoid copying segment from deep store.
SegmentPushUtils.sendSegmentUriAndMetadata(spec, LOCAL_PINOT_FS, segmentUriToTarPathMap, headers, parameters);
try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
Map<String, String> segmentUriToTarPathMap =
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{outputSegmentTarURI.toString()});
SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
}
} else {
throw new RuntimeException("Output dir URI missing for metadata push");
}
Expand Down

0 comments on commit 2adb4d7

Please sign in to comment.