Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void start()
minionMetrics.setValueOfGlobalGauge(MinionGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
MinionMetrics.register(minionMetrics);
minionContext.setMinionMetrics(minionMetrics);
minionContext.setAllowDownloadFromServer(_config.isAllowDownloadFromServer());

// Install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, CommonConstants.Minion.MINION_TLS_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public int getEndReplaceSegmentsTimeoutMs() {
return getProperty(END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY, DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS);
}

public boolean isAllowDownloadFromServer() {
return Boolean.parseBoolean(getProperty(CommonConstants.Minion.CONFIG_OF_ALLOW_DOWNLOAD_FROM_SERVER,
CommonConstants.Minion.DEFAULT_ALLOW_DOWNLOAD_FROM_SERVER));
}

public PinotConfiguration getMetricsConfig() {
return subset(CommonConstants.Minion.METRICS_CONFIG_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static MinionContext getInstance() {
// For PurgeTask
private SegmentPurger.RecordPurgerFactory _recordPurgerFactory;
private SegmentPurger.RecordModifierFactory _recordModifierFactory;
private boolean _allowDownloadFromServer;

public File getDataDir() {
return _dataDir;
Expand Down Expand Up @@ -119,4 +120,12 @@ public void setHelixManager(HelixManager helixManager) {
public HelixManager getHelixManager() {
return _helixManager;
}

public void setAllowDownloadFromServer(boolean allowDownloadFromServer) {
_allowDownloadFromServer = allowDownloadFromServer;
}

public boolean isAllowDownloadFromServer() {
return _allowDownloadFromServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected void downloadSegmentToLocal(String tableNameWithType, String segmentNa
} catch (Exception e) {
LOGGER.error("Segment download failed from deepstore for {}, crypter:{}", deepstoreURL, crypterName, e);
String peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType) && peerDownloadScheme != null) {
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType,
MINION_CONTEXT.isAllowDownloadFromServer()) && peerDownloadScheme != null) {
LOGGER.info("Trying to download from servers for segment {} post deepstore download failed", segmentName);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, peerDownloadScheme, () -> {
List<URI> uris =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,16 @@ public static List<String> getServers(String segmentName, String tableNameWithTy
/**
* Extract allowDownloadFromServer config from table task config
*/
public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConfig, String taskType) {
public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConfig, String taskType,
boolean defaultValue) {
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig != null) {
Map<String, String> configs = tableTaskConfig.getConfigsForTaskType(taskType);
if (configs != null && !configs.isEmpty()) {
return Boolean.parseBoolean(configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER,
String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER)));
String.valueOf(defaultValue)));
}
}
return TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER;
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ public void testExtractMinionAllowDownloadFromServer() {

// Test when the configuration is not set, should return the default value which is false
assertFalse(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));

// Test when the configuration is set to true
configs.put(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, "true");
tableTaskConfig = new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, configs));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertTrue(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));

// Test when the configuration is set to false
configs.put(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, "false");
tableTaskConfig = new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, configs));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertFalse(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,8 @@ public static class Minion {
public static final String CONFIG_OF_EVENT_OBSERVER_CLEANUP_DELAY_IN_SEC =
"pinot.minion.event.observer.cleanupDelayInSec";
public static final char TASK_LIST_SEPARATOR = ',';
public static final String CONFIG_OF_ALLOW_DOWNLOAD_FROM_SERVER = "pinot.minion.task.allow.download.from.server";
public static final String DEFAULT_ALLOW_DOWNLOAD_FROM_SERVER = "false";
}

public static class ControllerJob {
Expand Down