-
Notifications
You must be signed in to change notification settings - Fork 1.5k
resume partially offline replicas in controller validation #17754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -213,6 +213,7 @@ public class PinotLLCRealtimeSegmentManager { | |
| private final FlushThresholdUpdateManager _flushThresholdUpdateManager; | ||
| private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled; | ||
| private final boolean _isTmpSegmentAsyncDeletionEnabled; | ||
| private final boolean _isPartialOfflineReplicaRepairEnabled; | ||
| private final int _deepstoreUploadRetryTimeoutMs; | ||
| private final FileUploadDownloadClient _fileUploadDownloadClient; | ||
| private final AtomicInteger _numCompletingSegments = new AtomicInteger(0); | ||
|
|
@@ -241,6 +242,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan | |
| _deepStoreUploadExecutorPendingSegments = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| _isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled(); | ||
| _isPartialOfflineReplicaRepairEnabled = controllerConf.isPartialOfflineReplicaRepairEnabled(); | ||
| _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs(); | ||
| } | ||
|
|
||
|
|
@@ -1803,6 +1805,27 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConf | |
| updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment, | ||
| instancePartitionsMap); | ||
| } | ||
| } else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS | ||
| && _isPartialOfflineReplicaRepairEnabled) { | ||
| // Handle case where some replicas are OFFLINE while others are CONSUMING | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to other repairs, you should add max segment completion time check, This ensures your repair does not mess up any actively ongoing commit.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a harmless addition but I think this is not needed in this scenario. Segments are supposed to be IN_PROGRESS in metadata and we are not creating new consuming segment with a different metadata and neither we are making any change to mark segment DONE in IdealState |
||
| // This happens when one replica fails (e.g., KafkaConsumer init error) and marks itself OFFLINE | ||
| // while other replicas continue consuming | ||
| List<String> offlineInstances = new ArrayList<>(); | ||
| for (Map.Entry<String, String> instanceEntry : instanceStateMap.entrySet()) { | ||
| if (SegmentStateModel.OFFLINE.equals(instanceEntry.getValue())) { | ||
| offlineInstances.add(instanceEntry.getKey()); | ||
| } | ||
| } | ||
|
|
||
| if (!offlineInstances.isEmpty()) { | ||
| LOGGER.info("Repairing segment: {} with {} OFFLINE replicas out of {} total replicas. " | ||
| + "Setting OFFLINE replicas back to CONSUMING: {}", latestSegmentName, offlineInstances.size(), | ||
| instanceStateMap.size(), offlineInstances); | ||
| // Set the OFFLINE replicas back to CONSUMING so they can retry | ||
| for (String offlineInstance : offlineInstances) { | ||
| instanceStateMap.put(offlineInstance, SegmentStateModel.CONSUMING); | ||
| } | ||
| } | ||
| } | ||
| // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment. | ||
| } else { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are only checking for IN_PROGRESS status, this should work for pauseless consumption as well. Can you call this out in the PR description for clarity.