Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ public void setCrc(long crc) {
setNonNegativeValue(Segment.CRC, crc);
}

public String getTier() {
return _simpleFields.get(Segment.TIER);
}

public void setTier(String tier) {
setValue(Segment.TIER, tier);
}

public long getCreationTime() {
return _znRecord.getLongField(Segment.CREATION_TIME, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pinot.common.utils.config;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.tier.FixedTierSegmentSelector;
import org.apache.pinot.common.tier.Tier;
Expand All @@ -30,6 +33,7 @@
import org.apache.pinot.common.tier.TimeBasedTierSegmentSelector;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand All @@ -47,6 +51,32 @@ public static boolean shouldRelocateToTiers(TableConfig tableConfig) {
return CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList());
}

public static String normalizeTierName(String tierName) {
return tierName == null ? "default" : tierName;
}

public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
String tableNameWithType = tableConfig.getTableName();
List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
Preconditions.checkState(CollectionUtils.isNotEmpty(tierCfgs), "No tierConfigs for table: %s", tableNameWithType);
TierConfig tierCfg = null;
for (TierConfig tc : tierCfgs) {
if (tierName.equals(tc.getName())) {
tierCfg = tc;
break;
}
}
Preconditions.checkNotNull(tierCfg, "No configs for tier: %s on table: %s", tierName, tableNameWithType);
// TODO: check if the tier configs are predefined in ClusterConfigs.
Map<String, String> backendProps = tierCfg.getTierBackendProperties();
Preconditions
.checkNotNull(backendProps, "No backend properties for tier: %s on table: %s", tierName, tableNameWithType);
String dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
Preconditions.checkState(StringUtils.isNotEmpty(dataDir), "No dataDir for tier: %s on table: %s", tierName,
tableNameWithType);
return dataDir;
}

/**
* Gets sorted list of tiers for given storage type from provided list of TierConfig
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,33 @@ public void testTierComparator() {
Assert.assertEquals(tierComparator.compare(tier6, tier5), -1);
Assert.assertEquals(tierComparator.compare(tier4, tier7), 1);
}

@Test
public void testGetDataDirForTier() {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
try {
TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No tierConfigs for table: myTable_OFFLINE");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
.newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null, null))).build();
try {
TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No configs for tier: tier1 on table: myTable_OFFLINE");
}
try {
TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "No backend properties for tier: myTier on table: myTable_OFFLINE");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
.newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null,
Collections.singletonMap("dataDir", "/foo/bar")))).build();
String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
Assert.assertEquals(dataDir, "/foo/bar");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
Expand Down Expand Up @@ -223,6 +224,7 @@ public void addSegment(ImmutableSegment immutableSegment) {
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
indexLoadingConfig.setTableDataDir(_tableDataDir);
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
}

Expand Down Expand Up @@ -348,7 +350,10 @@ public Map<String, SegmentErrorInfo> getSegmentErrors() {
public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean forceDownload)
throws Exception {
File indexDir = getSegmentDataDir(segmentName);
String segmentTier = getSegmentCurrentTier(segmentName);
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig());
try {
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
Expand All @@ -366,7 +371,8 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
}
indexDir = downloadSegment(segmentName, zkMetadata);
} else {
LOGGER.info("Reload existing segment: {} of table: {}", segmentName, _tableNameWithType);
LOGGER.info("Reload existing segment: {} of table: {} on tier: {}", segmentName, _tableNameWithType,
TierConfigUtils.normalizeTierName(segmentTier));
// The indexDir is empty after calling createBackup, as it's renamed to a backup directory.
// The SegmentDirectory should initialize accordingly. Like for SegmentLocalFSDirectory, it
// doesn't load anything from an empty indexDir, but gets the info to complete the copyTo.
Expand All @@ -378,6 +384,7 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon

// Load from indexDir and replace the old segment in memory. What's inside indexDir
// may come from SegmentDirectory.copyTo() or the segment downloaded from deep store.
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema);
addSegment(segment);

Expand Down Expand Up @@ -409,6 +416,9 @@ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoad
// still on the server but the metadata object has not been initialized yet. In this case,
// we should check if the segment exists on server and try to load it. If the segment does
// not exist or fails to get loaded, we download segment from deep store to load it again.
String segmentTier = zkMetadata.getTier();
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
if (localMetadata == null && tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) {
return;
}
Expand All @@ -425,9 +435,11 @@ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoad
localMetadata.getCrc(), zkMetadata.getCrc());
}
File indexDir = downloadSegment(segmentName, zkMetadata);
addSegment(indexDir, indexLoadingConfig);
LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType,
zkMetadata.getCrc());
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema, true);
addSegment(segment);
LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {} on tier: {}", segmentName,
_tableNameWithType, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier));
}

/**
Expand Down Expand Up @@ -590,6 +602,31 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}

@VisibleForTesting
File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
if (segmentTier == null) {
return getSegmentDataDir(segmentName);
}
try {
String tierDataDir = TierConfigUtils.getDataDirForTier(tableConfig, segmentTier);
File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
return new File(tierTableDataDir, segmentName);
} catch (Exception e) {
LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier: {} due to error: {}", segmentName,
_tableNameWithType, segmentTier, e.getMessage());
return getSegmentDataDir(segmentName);
}
}

@Nullable
private String getSegmentCurrentTier(String segmentName) {
SegmentDataManager segment = _segmentDataManagerMap.get(segmentName);
if (segment != null && segment.getSegment() instanceof ImmutableSegment) {
return ((ImmutableSegment) segment.getSegment()).getTier();
}
return null;
}

@VisibleForTesting
protected File getTmpSegmentDataDir(String segmentName) {
return new File(_resourceTmpDir, segmentName);
Expand Down Expand Up @@ -646,7 +683,8 @@ private void removeBackup(File indexDir)
private boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
// Try to recover the segment from potential segment reloading failure.
File indexDir = getSegmentDataDir(segmentName);
String segmentTier = zkMetadata.getTier();
File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig());
recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);

Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Expand Down Expand Up @@ -691,12 +729,12 @@ private boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig in
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema);
addSegment(segment);
LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType,
zkMetadata.getCrc());
LOGGER.info("Loaded existing segment: {} of table: {} with crc: {} on tier: {}", segmentName, _tableNameWithType,
zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier));
return true;
} catch (Exception e) {
LOGGER.error("Failed to load existing segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType,
zkMetadata.getCrc(), e);
LOGGER.error("Failed to load existing segment: {} of table: {} with crc: {} on tier: {}", segmentName,
_tableNameWithType, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
}
Expand All @@ -718,12 +756,14 @@ private SegmentDirectory initSegmentDirectory(String segmentName, String segment
throws Exception {
SegmentDirectoryLoaderContext loaderContext =
new SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
.setSchema(schema).setInstanceId(indexLoadingConfig.getInstanceId()).setSegmentName(segmentName)
.setSegmentCrc(segmentCrc).setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
.build();
.setSchema(schema).setInstanceId(indexLoadingConfig.getInstanceId())
.setTableDataDir(indexLoadingConfig.getTableDataDir()).setSegmentName(segmentName).setSegmentCrc(segmentCrc)
.setSegmentTier(indexLoadingConfig.getSegmentTier())
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs()).build();
SegmentDirectoryLoader segmentDirectoryLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
File indexDir = getSegmentDataDir(segmentName);
File indexDir =
getSegmentDataDir(segmentName, indexLoadingConfig.getSegmentTier(), indexLoadingConfig.getTableConfig());
return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ void addRealtimeSegment(String realtimeTableName, String segmentName)
throws Exception;

/**
* Removes a segment from a table.
* Offloads a segment from table but not dropping its data from server.
*/
void removeSegment(String tableNameWithType, String segmentName)
void offloadSegment(String tableNameWithType, String segmentName)
throws Exception;

/**
* Delete segment data from the server physically.
*/
void deleteSegment(String tableNameWithType, String segmentName)
throws Exception;

/**
Expand Down
Loading