Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,12 +35,15 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Util methods for TierConfig
*/
public final class TierConfigUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TierConfigUtils.class);

private TierConfigUtils() {
}
Expand All @@ -56,23 +60,47 @@ public static String normalizeTierName(String tierName) {
}

public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
}

public static String getDataDirForTier(TableConfig tableConfig, String tierName,
Map<String, Map<String, String>> instanceTierConfigs) {
String tableNameWithType = tableConfig.getTableName();
String dataDir = null;
List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
Preconditions.checkState(CollectionUtils.isNotEmpty(tierCfgs), "No tierConfigs for table: %s", tableNameWithType);
TierConfig tierCfg = null;
for (TierConfig tc : tierCfgs) {
if (tierName.equals(tc.getName())) {
tierCfg = tc;
break;
if (CollectionUtils.isNotEmpty(tierCfgs)) {
TierConfig tierCfg = null;
for (TierConfig tc : tierCfgs) {
if (tierName.equals(tc.getName())) {
tierCfg = tc;
break;
}
}
if (tierCfg != null) {
Map<String, String> backendProps = tierCfg.getTierBackendProperties();
if (backendProps != null) {
dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
} else {
LOGGER.debug("No backend props for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
}
if (StringUtils.isNotEmpty(dataDir)) {
LOGGER.debug("Got dataDir: {} for tier: {} in TableConfig of table: {}", dataDir, tierName,
tableNameWithType);
return dataDir;
} else {
LOGGER.debug("No dataDir for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
}
}
}
Preconditions.checkNotNull(tierCfg, "No configs for tier: %s on table: %s", tierName, tableNameWithType);
// TODO: check if the tier configs are predefined in ClusterConfigs.
Map<String, String> backendProps = tierCfg.getTierBackendProperties();
Preconditions
.checkNotNull(backendProps, "No backend properties for tier: %s on table: %s", tierName, tableNameWithType);
String dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
Preconditions.checkState(StringUtils.isNotEmpty(dataDir), "No dataDir for tier: %s on table: %s", tierName,
// Check if there is data path defined in instance tier configs.
Map<String, String> instanceCfgs = instanceTierConfigs.get(tierName);
if (instanceCfgs != null) {
// All instance config names are lower cased while being passed down here.
dataDir = instanceCfgs.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR.toLowerCase());
}
Preconditions.checkState(StringUtils.isNotEmpty(dataDir), "No dataDir for tier: %s for table: %s", tierName,
tableNameWithType);
LOGGER.debug("Got dataDir: {} for tier: {} for table: {} in instance configs", dataDir, tierName,
tableNameWithType);
return dataDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -193,26 +195,34 @@ public void testGetDataDirForTier() {
try {
TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No tierConfigs for table: myTable_OFFLINE");
Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
.newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null, null))).build();
try {
TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No configs for tier: tier1 on table: myTable_OFFLINE");
Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
}
try {
TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No backend properties for tier: myTier on table: myTable_OFFLINE");
Assert.assertEquals(e.getMessage(), "No dataDir for tier: myTier for table: myTable_OFFLINE");
}
// Provide instance tierConfigs for the tier.
Map<String, Map<String, String>> instanceTierConfigs = new HashMap<>();
Map<String, String> tierCfgMap = new HashMap<>();
tierCfgMap.put("datadir", "/abc/xyz");
instanceTierConfigs.put("myTier", tierCfgMap);
String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
Assert.assertEquals(dataDir, "/abc/xyz");
// Table tierConfigs overwrite those from instance tierConfigs.
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
.newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null,
Collections.singletonMap("dataDir", "/foo/bar")))).build();
String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
Assert.assertEquals(dataDir, "/foo/bar");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void migrateToTargetTier(String tableNameWithType) {
try {
TableTierReader.TableTierDetails tableTiers =
new TableTierReader(_executorService, _connectionManager, _pinotHelixResourceManager).getTableTierDetails(
tableNameWithType, null, _timeoutMs);
tableNameWithType, null, _timeoutMs, true);
triggerLocalTierMigration(tableNameWithType, tableTiers,
_pinotHelixResourceManager.getHelixZkManager().getMessagingService());
LOGGER.info("Migrated segments of table: {} to new tiers on hosting servers", tableNameWithType);
Expand Down Expand Up @@ -218,14 +218,14 @@ static void updateSegmentTargetTier(String tableNameWithType, String segmentName
String targetTierName = null;
if (targetTier == null) {
if (segmentZKMetadata.getTier() == null) {
LOGGER.debug("Segment: {} of table: {} is already on the default tier", segmentName, tableNameWithType);
LOGGER.debug("Segment: {} of table: {} is already set to go to default tier", segmentName, tableNameWithType);
return;
}
LOGGER.info("Segment: {} of table: {} is put back on default tier", segmentName, tableNameWithType);
} else {
targetTierName = targetTier.getName();
if (targetTierName.equals(segmentZKMetadata.getTier())) {
LOGGER.debug("Segment: {} of table: {} is already on the target tier: {}", segmentName, tableNameWithType,
LOGGER.debug("Segment: {} of table: {} is already set to go to target tier: {}", segmentName, tableNameWithType,
targetTierName);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public TableTierReader(Executor executor, HttpConnectionManager connectionManage
*/
public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable String segmentName, int timeoutMs)
throws InvalidConfigException {
return getTableTierDetails(tableNameWithType, segmentName, timeoutMs, false);
}

public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable String segmentName, int timeoutMs,
boolean skipErrors)
throws InvalidConfigException {
Map<String, List<String>> serverToSegmentsMap = new HashMap<>();
if (segmentName == null) {
serverToSegmentsMap.putAll(_helixResourceManager.getServerToSegmentsMap(tableNameWithType));
Expand All @@ -89,8 +95,11 @@ public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable
List<String> expectedSegmentsOnServer = entry.getValue();
TableTierInfo tableTierInfo = serverToTableTierInfoMap.get(server);
for (String expectedSegment : expectedSegmentsOnServer) {
tableTierDetails._segmentCurrentTiers.computeIfAbsent(expectedSegment, (k) -> new HashMap<>()).put(server,
(tableTierInfo == null) ? ERROR_RESP_NO_RESPONSE : getSegmentTier(expectedSegment, tableTierInfo));
String tier = tableTierInfo == null ? ERROR_RESP_NO_RESPONSE : getSegmentTier(expectedSegment, tableTierInfo);
if (!skipErrors || !hasError(tier)) {
tableTierDetails._segmentCurrentTiers.computeIfAbsent(expectedSegment, (k) -> new HashMap<>())
.put(server, tier);
}
}
}
if (segmentName == null) {
Expand All @@ -106,6 +115,11 @@ public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable
return tableTierDetails;
}

private static boolean hasError(String tier) {
return ERROR_RESP_MISSING_SEGMENT.equals(tier) || ERROR_RESP_NO_RESPONSE.equals(tier)
|| ERROR_RESP_NOT_IMMUTABLE.equals(tier);
}

private static String getSegmentTier(String expectedSegment, TableTierInfo tableTierInfo) {
if (tableTierInfo.getMutableSegments().contains(expectedSegment)) {
return ERROR_RESP_NOT_IMMUTABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
String segmentTier = getSegmentCurrentTier(segmentName);
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig());
try {
// Create backup directory to handle failure of segment reloading.
Expand Down Expand Up @@ -415,6 +416,7 @@ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoad
String segmentTier = zkMetadata.getTier();
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
if (localMetadata == null && tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) {
return;
}
Expand Down Expand Up @@ -639,7 +641,8 @@ File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableCo
return getSegmentDataDir(segmentName);
}
try {
String tierDataDir = TierConfigUtils.getDataDirForTier(tableConfig, segmentTier);
String tierDataDir =
TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
return new File(tierTableDataDir, segmentName);
} catch (Exception e) {
Expand Down Expand Up @@ -793,6 +796,7 @@ private SegmentDirectory initSegmentDirectory(String segmentName, String segment
.setSchema(indexLoadingConfig.getSchema()).setInstanceId(indexLoadingConfig.getInstanceId())
.setTableDataDir(indexLoadingConfig.getTableDataDir()).setSegmentName(segmentName).setSegmentCrc(segmentCrc)
.setSegmentTier(indexLoadingConfig.getSegmentTier())
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs()).build();
SegmentDirectoryLoader segmentDirectoryLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
*/
package org.apache.pinot.segment.local.data.manager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
Expand All @@ -42,11 +47,15 @@ public class TableDataManagerConfig {
private static final String TABLE_DATA_MANAGER_NAME = "name";
private static final String TABLE_IS_DIMENSION = "isDimTable";
private static final String TABLE_DATA_MANAGER_AUTH = "auth";

private static final String TABLE_DATA_MANAGER_TIER_CONFIGS = "tierConfigs";
private static final String TIER_CONFIGS_TIER_NAMES = "tierNames";
private static final String TABLE_DELETED_SEGMENTS_CACHE_SIZE = "deletedSegmentsCacheSize";
private static final String TABLE_DELETED_SEGMENTS_CACHE_TTL_MINUTES = "deletedSegmentsCacheTTL";
private static final String TABLE_PEER_DOWNLOAD_SCHEME = "peerDownloadScheme";

private final Configuration _tableDataManagerConfig;
private volatile Map<String, Map<String, String>> _instanceTierConfigMaps;

public TableDataManagerConfig(Configuration tableDataManagerConfig) {
_tableDataManagerConfig = tableDataManagerConfig;
Expand Down Expand Up @@ -80,6 +89,41 @@ public Configuration getAuthConfig() {
return _tableDataManagerConfig.subset(TABLE_DATA_MANAGER_AUTH);
}

public Map<String, Map<String, String>> getInstanceTierConfigs() {
if (_instanceTierConfigMaps != null) {
return _instanceTierConfigMaps;
}
// Keep it simple and not handle the potential double computes as it's not that costly anyway.
_instanceTierConfigMaps = getTierConfigMaps(_tableDataManagerConfig.subset(TABLE_DATA_MANAGER_TIER_CONFIGS));
return _instanceTierConfigMaps;
}

@VisibleForTesting
static Map<String, Map<String, String>> getTierConfigMaps(Configuration allTierCfgs) {
// Note that config names from the instance configs are all lowered cased, e.g.
// - tiernames:[hotTier, coldTier]
// - hottier.datadir:/tmp/multidir_test/hotTier
// - coldtier.datadir:/tmp/multidir_test/coldTier
// And Configuration uses ',' as the list separator to get the list of strings.
// Therefore, the tier name should not contain ',' and must be unique case-insensitive.
String[] tierNames = allTierCfgs.getStringArray(TIER_CONFIGS_TIER_NAMES.toLowerCase());
if (tierNames == null) {
LOGGER.debug("No tierConfigs from instanceConfig");
return Collections.emptyMap();
}
Map<String, Map<String, String>> tierCfgMaps = new HashMap<>();
for (String tierName : tierNames) {
Configuration tierCfgs = allTierCfgs.subset(tierName.toLowerCase());
for (Iterator<String> cfgKeys = tierCfgs.getKeys(); cfgKeys.hasNext(); ) {
String cfgKey = cfgKeys.next();
String cfgValue = tierCfgs.getString(cfgKey);
tierCfgMaps.computeIfAbsent(tierName, (k) -> new HashMap<>()).put(cfgKey, cfgValue);
}
}
LOGGER.debug("Got tierConfigs: {} from instanceConfig", tierCfgMaps);
return tierCfgMaps;
}

public int getTableDeletedSegmentsCacheSize() {
return _tableDataManagerConfig.getInt(TABLE_DELETED_SEGMENTS_CACHE_SIZE);
}
Expand Down Expand Up @@ -116,6 +160,10 @@ public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
instanceDataManagerConfig.getConfig().subset(TABLE_DATA_MANAGER_AUTH).toMap()
.forEach((key, value) -> defaultConfig.setProperty(TABLE_DATA_MANAGER_AUTH + "." + key, value));

// copy tier configs
instanceDataManagerConfig.getConfig().subset(TABLE_DATA_MANAGER_TIER_CONFIGS).toMap()
.forEach((key, value) -> defaultConfig.setProperty(TABLE_DATA_MANAGER_TIER_CONFIGS + "." + key, value));

return new TableDataManagerConfig(defaultConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadi
.setSchema(schema).setInstanceId(indexLoadingConfig.getInstanceId())
.setTableDataDir(indexLoadingConfig.getTableDataDir()).setSegmentName(segmentName)
.setSegmentCrc(segmentMetadata.getCrc()).setSegmentTier(indexLoadingConfig.getSegmentTier())
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs()).build();
SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ private void deleteSegmentTierPersistedLocally(String segmentName, SegmentDirect
FileUtils.deleteQuietly(trackFile);
}

private File getSegmentDataDirOrDefault(String segmentTier, SegmentDirectoryLoaderContext loaderContext) {
File dataDir = getSegmentDataDir(segmentTier, loaderContext);
return dataDir != null ? dataDir : getDefaultDataDir(loaderContext);
}

private File getDefaultDataDir(SegmentDirectoryLoaderContext loaderContext) {
return new File(loaderContext.getTableDataDir(), loaderContext.getSegmentName());
}
Expand All @@ -211,7 +206,8 @@ private File getSegmentDataDir(String segmentTier, SegmentDirectoryLoaderContext
String tableNameWithType = tableConfig.getTableName();
String segmentName = loaderContext.getSegmentName();
try {
String tierDataDir = TierConfigUtils.getDataDirForTier(tableConfig, segmentTier);
String tierDataDir =
TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, loaderContext.getInstanceTierConfigs());
File tierTableDataDir = new File(tierDataDir, tableNameWithType);
return new File(tierTableDataDir, segmentName);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class IndexLoadingConfig {
private String _segmentTier;

private String _instanceId;
private Map<String, Map<String, String>> _instanceTierConfigs;

/**
* NOTE: This step might modify the passed in table config and schema.
Expand Down Expand Up @@ -656,4 +657,12 @@ public void setSegmentTier(String segmentTier) {
public String getSegmentTier() {
return _segmentTier;
}

public void setInstanceTierConfigs(Map<String, Map<String, String>> tierConfigs) {
_instanceTierConfigs = tierConfigs;
}

public Map<String, Map<String, String>> getInstanceTierConfigs() {
return _instanceTierConfigs;
}
}
Loading