Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public static class ControllerPeriodicTasksConf {
"controller.segment.level.validation.intervalPeriod";
public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
"controller.segment.error.autoReset";
public static final String ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR =
"controller.realtime.segment.partialOfflineReplicaRepairEnabled";
public static final String DISASTER_RECOVERY_MODE_CONFIG_KEY = "controller.segment.disaster.recovery.mode";

// Initial delays
Expand Down Expand Up @@ -1133,6 +1135,10 @@ public boolean isAutoResetErrorSegmentsOnValidationEnabled() {
return getProperty(ControllerPeriodicTasksConf.AUTO_RESET_ERROR_SEGMENTS_VALIDATION, true);
}

public boolean isPartialOfflineReplicaRepairEnabled() {
return getProperty(ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR, false);
}

public DisasterRecoveryMode getDisasterRecoveryMode() {
return getDisasterRecoveryMode(getProperty(ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public class PinotLLCRealtimeSegmentManager {
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
private final boolean _isTmpSegmentAsyncDeletionEnabled;
private final boolean _isPartialOfflineReplicaRepairEnabled;
private final int _deepstoreUploadRetryTimeoutMs;
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
Expand Down Expand Up @@ -241,6 +242,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
_deepStoreUploadExecutorPendingSegments = ConcurrentHashMap.newKeySet();

_isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled();
_isPartialOfflineReplicaRepairEnabled = controllerConf.isPartialOfflineReplicaRepairEnabled();
_deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
}

Expand Down Expand Up @@ -1803,6 +1805,27 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConf
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
instancePartitionsMap);
}
} else if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only checking for IN_PROGRESS status, this should work for pauseless consumption as well. Can you call this out in the PR description for clarity.

&& _isPartialOfflineReplicaRepairEnabled) {
// Handle case where some replicas are OFFLINE while others are CONSUMING
Copy link
Contributor

@anuragrai16 anuragrai16 Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to other repairs, you should add max segment completion time check,

if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
        continue; 
    }

This ensures your repair does not mess up any actively ongoing commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@rohityadav1993 rohityadav1993 Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a harmless addition but I think this is not needed in this scenario. Segments are supposed to be IN_PROGRESS in metadata and we are not creating new consuming segment with a different metadata and neither we are making any change to mark segment DONE in IdealState

// This happens when one replica fails (e.g., KafkaConsumer init error) and marks itself OFFLINE
// while other replicas continue consuming
List<String> offlineInstances = new ArrayList<>();
for (Map.Entry<String, String> instanceEntry : instanceStateMap.entrySet()) {
if (SegmentStateModel.OFFLINE.equals(instanceEntry.getValue())) {
offlineInstances.add(instanceEntry.getKey());
}
}

if (!offlineInstances.isEmpty()) {
LOGGER.info("Repairing segment: {} with {} OFFLINE replicas out of {} total replicas. "
+ "Setting OFFLINE replicas back to CONSUMING: {}", latestSegmentName, offlineInstances.size(),
instanceStateMap.size(), offlineInstances);
// Set the OFFLINE replicas back to CONSUMING so they can retry
for (String offlineInstance : offlineInstances) {
instanceStateMap.put(offlineInstance, SegmentStateModel.CONSUMING);
}
}
}
// else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,77 @@ public void testRepairs() {
testRepairs(segmentManager, Lists.newArrayList(1));
}

@Test
public void testPartialOfflineReplicaRepair() {
// Set up a new table with 3 replicas, 5 instances, 2 partitions
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager, true);
setUpNewTable(segmentManager, 3, 5, 2);
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();

// Turn one replica OFFLINE for the CONSUMING segment in partition group 0 (simulating issue #11314)
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(consumingSegmentInstanceStateMap.size(), 3);

// Find the first instance and mark it OFFLINE
String offlineInstance = consumingSegmentInstanceStateMap.keySet().iterator().next();
assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), SegmentStateModel.CONSUMING);
consumingSegmentInstanceStateMap.put(offlineInstance, SegmentStateModel.OFFLINE);

// Verify we have mixed state: 2 CONSUMING, 1 OFFLINE
long consumingCount = consumingSegmentInstanceStateMap.values().stream()
.filter(s -> s.equals(SegmentStateModel.CONSUMING)).count();
long offlineCount = consumingSegmentInstanceStateMap.values().stream()
.filter(s -> s.equals(SegmentStateModel.OFFLINE)).count();
assertEquals(consumingCount, 2);
assertEquals(offlineCount, 1);

// Run repair - should set the OFFLINE replica back to CONSUMING
segmentManager._exceededMaxSegmentCompletionTime = true;
segmentManager.ensureAllPartitionsConsuming();

// Verify all replicas are now CONSUMING
consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
Collections.singleton(SegmentStateModel.CONSUMING));
assertEquals(consumingSegmentInstanceStateMap.size(), 3);
assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), SegmentStateModel.CONSUMING);
}

@Test
public void testPartialOfflineReplicaRepairDisabled() {
// Set up a new table with 3 replicas, 5 instances, 2 partitions
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
// Create segment manager with partial repair DISABLED
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager, false);
setUpNewTable(segmentManager, 3, 5, 2);
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();

// Turn one replica OFFLINE for the CONSUMING segment in partition group 0
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);

String offlineInstance = consumingSegmentInstanceStateMap.keySet().iterator().next();
consumingSegmentInstanceStateMap.put(offlineInstance, SegmentStateModel.OFFLINE);

// Store old state
Map<String, Map<String, String>> oldInstanceStatesMap = cloneInstanceStatesMap(instanceStatesMap);

// Run repair - should NOT change anything since the feature is disabled
segmentManager._exceededMaxSegmentCompletionTime = true;
segmentManager.ensureAllPartitionsConsuming();

// Verify the OFFLINE replica is still OFFLINE (no repair)
consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertEquals(consumingSegmentInstanceStateMap.get(offlineInstance), SegmentStateModel.OFFLINE);
assertEquals(oldInstanceStatesMap.get(consumingSegment), consumingSegmentInstanceStateMap);
}

/**
* Removes the new CONSUMING segment and sets the latest committed (ONLINE) segment to CONSUMING if exists in the
* ideal state.
Expand Down Expand Up @@ -2127,6 +2198,21 @@ private static class FakePinotLLCRealtimeSegmentManager extends PinotLLCRealtime
_mockResourceManager = pinotHelixResourceManager;
}

FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
boolean enablePartialOfflineReplicaRepair) {
super(pinotHelixResourceManager, createControllerConf(enablePartialOfflineReplicaRepair),
mock(ControllerMetrics.class));
_mockResourceManager = pinotHelixResourceManager;
}

private static ControllerConf createControllerConf(boolean enablePartialOfflineReplicaRepair) {
ControllerConf config = new ControllerConf();
config.setDataDir(TEMP_DIR.toString());
config.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR,
enablePartialOfflineReplicaRepair);
return config;
}

private static PinotHelixResourceManager createMockedResourceManager() {
PinotHelixResourceManager mockResourceManager = mock(PinotHelixResourceManager.class);
HelixManager mockHelixManager = mock(HelixManager.class);
Expand Down
Loading