-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add capabilities to ingest from another stream without disabling the realtime table #9289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -873,25 +873,28 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea | |
| * which means it's manually triggered by admin not by automatic periodic task) | ||
| */ | ||
| public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, | ||
| boolean recreateDeletedConsumingSegment) { | ||
| boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) { | ||
| Preconditions.checkState(!_isStopping, "Segment manager is stopping"); | ||
|
|
||
| String realtimeTableName = tableConfig.getTableName(); | ||
| HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { | ||
| assert idealState != null; | ||
| boolean isTableEnabled = idealState.isEnabled(); | ||
| boolean isTablePaused = isTablePaused(idealState); | ||
| boolean offsetsHaveToChange = offsetCriteria != null; | ||
| if (isTableEnabled && !isTablePaused) { | ||
| List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = | ||
| getPartitionGroupConsumptionStatusList(idealState, streamConfig); | ||
| // Read the smallest offset when a new partition is detected | ||
| offsetsHaveToChange | ||
| ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions | ||
| : getPartitionGroupConsumptionStatusList(idealState, streamConfig); | ||
| OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); | ||
| streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); | ||
| // Read the smallest offset when a new partition is detected | ||
| streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); | ||
| List<PartitionGroupMetadata> newPartitionGroupMetadataList = | ||
| getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); | ||
| streamConfig.setOffsetCriteria(originalOffsetCriteria); | ||
| return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, | ||
| recreateDeletedConsumingSegment); | ||
| recreateDeletedConsumingSegment, offsetCriteria); | ||
| } else { | ||
| LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", | ||
| realtimeTableName, isTableEnabled, isTablePaused); | ||
|
|
@@ -1052,7 +1055,7 @@ private boolean isAllInstancesInState(Map<String, String> instanceStateMap, Stri | |
| @VisibleForTesting | ||
| IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, | ||
| IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList, | ||
| boolean recreateDeletedConsumingSegment) { | ||
| boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) { | ||
| String realtimeTableName = tableConfig.getTableName(); | ||
|
|
||
| InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); | ||
|
|
@@ -1074,6 +1077,16 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS | |
| // Get the latest segment ZK metadata for each partition | ||
| Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); | ||
|
|
||
| // create a map of <parition id, start offset> using data already fetched in newPartitionGroupMetadataList | ||
| Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>(); | ||
| for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) { | ||
| partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); | ||
| } | ||
| Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null; | ||
| if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) { | ||
| partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset; | ||
| } | ||
|
|
||
| // Walk over all partitions that we have metadata for, and repair any partitions necessary. | ||
| // Possible things to repair: | ||
| // 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING: | ||
|
|
@@ -1144,21 +1157,37 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS | |
| // 3. we should never end up with some replicas ONLINE and some OFFLINE. | ||
| if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { | ||
| LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName); | ||
| StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); | ||
| if (partitionGroupIdToSmallestStreamOffset == null) { | ||
| // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset. | ||
| // This is to prevent fetching the same info for each and every partition group. | ||
| partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); | ||
| } | ||
| StreamPartitionMsgOffset startOffset = | ||
| selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, | ||
| partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, | ||
| latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning | ||
| createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, | ||
| partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, | ||
| segmentAssignment, instancePartitionsMap, startOffset); | ||
| newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, | ||
| instancePartitionsMap, startOffset); | ||
| } else { | ||
| if (newPartitionGroupSet.contains(partitionGroupId)) { | ||
| if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() | ||
| && isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) { | ||
| // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. | ||
| // Create a new IN_PROGRESS segment in PROPERTYSTORE, | ||
| // add it as CONSUMING segment to IDEALSTATE. | ||
| StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset()); | ||
| if (partitionGroupIdToSmallestStreamOffset == null) { | ||
|
||
| // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset. | ||
| // This is to prevent fetching the same info for each and every partition group. | ||
| partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); | ||
| } | ||
| StreamPartitionMsgOffset startOffset = | ||
| selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, | ||
| partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, | ||
| latestSegmentZKMetadata.getEndOffset()); | ||
| createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, | ||
| partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, | ||
| segmentAssignment, instancePartitionsMap, startOffset); | ||
| newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, | ||
| instancePartitionsMap, startOffset); | ||
| } else { | ||
| LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, | ||
| latestSegmentName); | ||
|
|
@@ -1220,31 +1249,14 @@ && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionG | |
| } | ||
|
|
||
| private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, | ||
| SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId, | ||
| SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, | ||
| List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, | ||
| Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment, | ||
| Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { | ||
| int numReplicas = getNumReplicas(tableConfig, instancePartitions); | ||
| int numPartitions = newPartitionGroupMetadataList.size(); | ||
| LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName()); | ||
| LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); | ||
| StreamPartitionMsgOffset partitionGroupSmallestOffset = | ||
| getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); | ||
|
|
||
| if (partitionGroupSmallestOffset != null) { | ||
| // Start offset must be higher than the start offset of the stream | ||
| if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) { | ||
| LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, | ||
| partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName()); | ||
| _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); | ||
| startOffset = partitionGroupSmallestOffset; | ||
| } | ||
| } else { | ||
| LOGGER.error("Smallest offset for partition: {} of table: {} not found. Using startOffset: {}", partitionGroupId, | ||
| tableConfig.getTableName(), startOffset); | ||
| _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); | ||
| } | ||
|
|
||
| CommittingSegmentDescriptor committingSegmentDescriptor = | ||
| new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0); | ||
| createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, | ||
|
|
@@ -1254,21 +1266,39 @@ private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelSt | |
| instancePartitionsMap); | ||
| } | ||
|
|
||
| @Nullable | ||
| private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) { | ||
| private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { | ||
| OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); | ||
| streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); | ||
| List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata = | ||
| List<PartitionGroupMetadata> partitionGroupMetadataList = | ||
| getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); | ||
| streamConfig.setOffsetCriteria(originalOffsetCriteria); | ||
| StreamPartitionMsgOffset partitionStartOffset = null; | ||
| for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) { | ||
| if (info.getPartitionGroupId() == partitionGroupId) { | ||
| partitionStartOffset = info.getStartOffset(); | ||
| break; | ||
| Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); | ||
| for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { | ||
| partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); | ||
| } | ||
| return partitionGroupIdToSmallestOffset; | ||
| } | ||
|
|
||
| private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId, | ||
|
||
| Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset, | ||
| Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName, | ||
| StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) { | ||
| if (offsetCriteria != null) { | ||
| // use the fetched offset according to offset criteria | ||
| return partitionGroupIdToStartOffset.get(partitionGroupId); | ||
| } else { | ||
| // use offset from segment ZK metadata | ||
| StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr); | ||
| StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId); | ||
| // Start offset in ZK must be higher than the start offset of the stream | ||
| if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) { | ||
| LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata, | ||
| streamSmallestOffset, partitionGroupId, tableName); | ||
| _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); | ||
| return streamSmallestOffset; | ||
| } | ||
| return startOffsetInSegmentZkMetadata; | ||
| } | ||
| return partitionStartOffset; | ||
| } | ||
|
|
||
| private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { | ||
|
|
@@ -1448,12 +1478,15 @@ public PauseStatus pauseConsumption(String tableNameWithType) { | |
| * 1) setting "isTablePaused" in ideal states to false and | ||
| * 2) triggering segment validation job to create new consuming segments in ideal states | ||
| */ | ||
| public PauseStatus resumeConsumption(String tableNameWithType) { | ||
| public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) { | ||
| IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false); | ||
|
|
||
| // trigger realtime segment validation job to resume consumption | ||
| Map<String, String> taskProperties = new HashMap<>(); | ||
| taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); | ||
| if (offsetCriteria != null) { | ||
| taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria); | ||
| } | ||
| _helixResourceManager | ||
| .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can u add a comment before this line what this map is supposed to contain?
The logic in this class is getting quite hard to read, can we even base class some methods and sub-class the partitionGroup vs partitionId for the two different type of streams we suppoer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the comments.
For the refactoring, I agree with you that this class is hard to read. It already has more than 1500 lines and it's worth refactoring, but the refactoring is outside the scope of this PR.