Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract common utils used to preload segments for future reuse #14161

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.util.concurrent.AtomicDouble;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,7 +32,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -42,39 +40,33 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -247,10 +239,9 @@ public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
_serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.UPSERT_PRELOAD_TIME_MS, duration,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
// Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be
// created. Once TableDataManager is created, no more segment preloading would happen, and the normal segment
// loading logic would be used. The segments not being preloaded successfully here would be loaded via the
// normal segment loading logic, the one doing more costly checks on the upsert metadata.
// We should continue even if preloading fails, so that segments not being preloaded successfully can get
// loaded via the normal segment loading logic as done on the Helix task threads although with more costly
// checks on the upsert metadata.
_logger.warn("Failed to preload segments from partition: {} of table: {}, skipping", _partitionId,
_tableNameWithType, e);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_PRELOAD_FAILURE, 1);
Expand All @@ -264,115 +255,20 @@ public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
}
}

// Keep this hook method for subclasses to extend the preloading logic as needed.
protected void doPreloadSegments(TableDataManager tableDataManager, IndexLoadingConfig indexLoadingConfig,
HelixManager helixManager, ExecutorService segmentPreloadExecutor)
throws Exception {
_logger.info("Preload segments from partition: {} of table: {} for fast upsert metadata recovery", _partitionId,
_tableNameWithType);
String instanceId = getInstanceId(tableDataManager);
Map<String, Map<String, String>> segmentAssignment = getSegmentAssignment(helixManager);
Map<String, SegmentZKMetadata> segmentMetadataMap = getSegmentsZKMetadata(helixManager);
List<Future<?>> futures = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
String state = instanceStateMap.get(instanceId);
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
if (state == null) {
_logger.debug("Skip segment: {} as it's not assigned to instance: {}", segmentName, instanceId);
} else {
_logger.info("Skip segment: {} as its ideal state: {} is not ONLINE for instance: {}", segmentName, state,
instanceId);
}
continue;
}
SegmentZKMetadata segmentZKMetadata = segmentMetadataMap.get(segmentName);
Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s",
segmentName, _tableNameWithType);
Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, null);
Preconditions.checkNotNull(partitionId,
String.format("Failed to get partition id for segment: %s (upsert-enabled table: %s)", segmentName,
_tableNameWithType));
if (partitionId != _partitionId) {
_logger.debug("Skip segment: {} as its partition: {} is different from the requested partition: {}",
segmentName, partitionId, _partitionId);
continue;
}
if (!hasValidDocIdsSnapshot(tableDataManager, indexLoadingConfig.getTableConfig(), segmentName,
segmentZKMetadata.getTier())) {
_logger.info("Skip segment: {} from partition: {} as no validDocIds snapshot exists", segmentName,
_partitionId);
continue;
}
futures.add(segmentPreloadExecutor.submit(
() -> doPreloadSegmentWithSnapshot(tableDataManager, segmentName, indexLoadingConfig, segmentZKMetadata)));
}
try {
for (Future<?> f : futures) {
f.get();
}
} finally {
for (Future<?> f : futures) {
if (!f.isDone()) {
f.cancel(true);
}
}
}
_logger.info("Preloaded {} segments from partition: {} of table: {} for fast upsert metadata recovery",
futures.size(), _partitionId, _tableNameWithType);
}

private String getInstanceId(TableDataManager tableDataManager) {
return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
}

private static boolean hasValidDocIdsSnapshot(TableDataManager tableDataManager, TableConfig tableConfig,
String segmentName, String segmentTier) {
try {
File indexDir = tableDataManager.getSegmentDataDir(segmentName, segmentTier, tableConfig);
File snapshotFile =
new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
return snapshotFile.exists();
} catch (Exception e) {
return false;
}
}

@VisibleForTesting
Map<String, Map<String, String>> getSegmentAssignment(HelixManager helixManager) {
IdealState idealState = HelixHelper.getTableIdealState(helixManager, _tableNameWithType);
Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", _tableNameWithType);
return idealState.getRecord().getMapFields();
}

@VisibleForTesting
Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager helixManager) {
Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), _tableNameWithType)
.forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
return segmentMetadataMap;
}

@VisibleForTesting
void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager, String segmentName,
IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) {
try {
_logger.info("Preload segment: {} from partition: {} of table: {}", segmentName, _partitionId,
_tableNameWithType);
// This method checks segment crc and if it has changed, the segment is not loaded. It might modify the
// file on disk, but we don't need to take the segmentLock, because every segment from the current table is
// processed by at most one thread from the preloading thread pool. HelixTaskExecutor task threads about to
// process segments from the same table are blocked on _preloadLock.
// In fact, taking segmentLock during segment preloading phase could cause deadlock when HelixTaskExecutor
// threads processing other tables have taken the same segmentLock as decided by the hash of table name and
// segment name, i.e. due to hash collision.
tableDataManager.tryLoadExistingSegment(segmentZKMetadata, indexLoadingConfig);
_logger.info("Preloaded segment: {} from partition: {} of table: {}", segmentName, _partitionId,
_tableNameWithType);
} catch (Exception e) {
_logger.warn("Failed to preload segment: {} from partition: {} of table: {}, skipping", segmentName, _partitionId,
_tableNameWithType, e);
}
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId, indexLoadingConfig, helixManager,
segmentPreloadExecutor, (segmentName, segmentZKMetadata) -> {
String tier = segmentZKMetadata.getTier();
if (SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager, tableConfig, segmentName, tier)) {
return true;
}
_logger.info("Skip segment: {} on tier: {} as it has no validDocIds snapshot", segmentName, tier);
return false;
});
}

@Override
Expand Down Expand Up @@ -821,19 +717,15 @@ public void removeSegment(IndexSegment segment) {
_logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
return;
}
// Skip removing the upsert metadata of segment that has max comparison value smaller than
// (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The expired metadata is removed while creating
// new consuming segment in batches.
boolean skipRemoveMetadata = false;
if (isOutOfMetadataTTL(segment)) {
_logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
skipRemoveMetadata = true;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a quick fix not quite related to the refactoring on preloading. Basically, move the check into try-catch in case isOutOfMetadataTTL throws exception, which should be rare but just in case.

if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
try {
if (!skipRemoveMetadata) {
// Skip removing the upsert metadata of segment that is out of metadata TTL. The expired metadata is removed
// while creating new consuming segment in batches.
if (isOutOfMetadataTTL(segment)) {
_logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
} else {
doRemoveSegment(segment);
}
_trackedSegments.remove(segment);
Expand Down
Loading
Loading