Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
try {
List<File> inputSegmentDirs = new ArrayList<>();
int numRecords = 0;
Expand All @@ -203,8 +202,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
.format("Downloading segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length));
File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i);
try {
downloadSegmentToLocal(tableNameWithType, segmentName, downloadURLs[i], taskType, tarredSegmentFile,
crypterName);
downloadSegmentToLocal(tableNameWithType, segmentName, downloadURLs[i], taskType, tarredSegmentFile);
} catch (Exception e) {
LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e);
_minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,13 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig)

File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs(), "Failed to create temporary directory: %s", tempDataDir);
String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();

try {
// Download the tarred segment file
_eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment from: " + downloadURL);
File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
LOGGER.info("Downloading segment from {} to {}", downloadURL, tarredSegmentFile.getAbsolutePath());
try {
downloadSegmentToLocal(tableNameWithType, segmentName, downloadURL, taskType, tarredSegmentFile, crypterName);
downloadSegmentToLocal(tableNameWithType, segmentName, downloadURL, taskType, tarredSegmentFile);
} catch (Exception e) {
LOGGER.error("Failed to download segment from download url: {}", downloadURL, e);
_minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,26 @@ private void addTaskMeterMetrics(MinionMeter meter, long unitCount, String table
}

protected void downloadSegmentToLocal(String tableNameWithType, String segmentName, String deepstoreURL,
String taskType, File tarredSegmentFile, String crypterName)
String taskType, File tarredSegmentFile)
throws Exception {
LOGGER.info("Downloading segment from {} to {}", deepstoreURL, tarredSegmentFile.getAbsolutePath());
LOGGER.info("Downloading segment {} from {} to {}", segmentName, deepstoreURL, tarredSegmentFile.getAbsolutePath());
TableConfig tableConfig = getTableConfig(tableNameWithType);
String crypterName = tableConfig.getValidationConfig().getCrypterClassName();
try {
// download from deepstore first
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(deepstoreURL, tarredSegmentFile, crypterName);
} catch (Exception e) {
LOGGER.error("Segment download failed from deepstore for {}, crypter:{}", deepstoreURL, crypterName, e);
TableConfig tableConfig = getTableConfig(tableNameWithType);
String peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType) && peerDownloadScheme != null) {
LOGGER.info("Trying to download from servers for segment {} post deepstore download failed", segmentName);
SegmentFetcherFactory.getSegmentFetcher(
getTableConfig(tableNameWithType).getValidationConfig().getPeerSegmentDownloadScheme())
.fetchSegmentToLocal(segmentName, () -> {
List<URI> uris =
PeerServerSegmentFinder.getPeerServerURIs(MINION_CONTEXT.getHelixManager(), tableNameWithType,
segmentName, peerDownloadScheme);
Collections.shuffle(uris);
return uris;
},
tarredSegmentFile);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, peerDownloadScheme, () -> {
List<URI> uris =
PeerServerSegmentFinder.getPeerServerURIs(MINION_CONTEXT.getHelixManager(), tableNameWithType,
segmentName, peerDownloadScheme);
Collections.shuffle(uris);
return uris;
}, tarredSegmentFile, crypterName);
} else {
throw e;
}
Expand Down