-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle the partitioning mismatch between table config and stream #6031
Handle the partitioning mismatch between table config and stream #6031
Conversation
2d5338a
to
fcf2b7a
Compare
|
||
|
||
/** | ||
* Integration test that enables segment partition for the LLC real-time table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also test partition function and num partitions change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, currently because of the existing behavior, the partition routing won't give correct result when stream is not properly partitioned. Added a TODO with the reason why it does not give correct result now, and will add a test in the next PR which will address that issue.
@@ -510,6 +511,11 @@ private LLCRealtimeSegmentZKMetadata updateCommittingSegmentZKMetadata(String re | |||
committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion()); | |||
committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs()); | |||
|
|||
// Update the partition metadata based on the segment metadata | |||
// NOTE: When the stream partition changes, or the records are not properly partitioned from the stream, the | |||
// partition of the segment can be different from the stream partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// partition of the segment can be different from the stream partition. | |
// partition of the segment is undefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not undefined, but based on the actual consumed records. Updated the comment to reflect that.
@@ -560,7 +566,8 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelS | |||
} | |||
|
|||
@Nullable | |||
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) { | |||
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numPartitions, | |
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numStreamPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted this part to keep minimum change and keep the current behavior. Will handle this in the next PR which requires some perf testing
@@ -39,6 +39,7 @@ | |||
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), | |||
REALTIME_OFFSET_COMMITS("commits", true), | |||
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), | |||
REALTIME_PARTITION_MISMATCH("mismatch", false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the metric on the controller instead? If it happens for one stream partition, it is highly likely that it will happen to all partitions, so might as well reduce noise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take that back. We need the metric to fire as soon as we detect a mismatch, so it has to be on the server.
// NOTE: Here we compare the number of partitions from the config and the stream, and log a warning and emit a | ||
// metric when they don't match, but use the one from the stream. The mismatch could happen when the | ||
// stream partitions are changed, but the table config has not been updated to reflect the change. In such | ||
// case, picking the number of partitions from the stream can keep the segment properly partitioned as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't recognize new partitions instantly. Everytime the realtime validation manager runs, it checks if the number of aprtitions have changed, and if so, starts a new consuming partition.
Let us say at time T1 we checked and the partition number did not change
At time T1 + 10, the partition numbers changed, but we did not know. The stream divided the records into a different partitioning system, thus having mismatched rows in (most likely) all partitions . At time T1 + 50, we check again, and create the new consuming segment for the new partition we detected.
In this case, all the segments that have the mismatched rows should be marked as not belonging to any partition.
I am not sure this condition is being handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class in on server side, so it does not rely on the validation manager (which runs on controller side) to detect the partitions change.
I did not change the logic for this class, but only added some notes for readability.
// changed, but the table config has not been updated to reflect the change. In such case, picking the | ||
// number of partitions from the stream can keep the segment properly partitioned as long as the partition | ||
// function is not changed. | ||
if (columnPartitionConfig.getNumPartitions() != numPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For realtime table, maybe we should not have the num partitions in the tableconfig. It can be set to -1, to indicate that it needs to come from the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, or we just don't read it. While we can still use it when the server failed to get the numPartitions from the stream (the existing behavior).
fcf2b7a
to
6a2fc33
Compare
@mayankshriv @mcvsubbu Addressed all the comments, and minimize the change in this PR to just bring back the old behavior where we update the partition info when committing the segment. |
6a2fc33
to
750a985
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm other than a minor comment
Description
Fix for #6029
In consuming segment, update the partition info when ingesting new records. Log a warning and emit a metric REALTIME_PARTITION_MISMATCH when the partition is not aligned with the stream partition. The updated partition info will be persisted in the segment metadata, and when the segment is committed, also update the partition info stored in the segment ZK metadata.
Added
SegmentPartitionLLCRealtimeClusterIntegrationTest
to test the expected behavior.NOTE: With the fix, the consuming segment can still be pruned out incorrectly if the partition info in the table config does not align with the stream. To fix that, we can only persist the partition info for the completed segments, but not the consuming segments. Need some perf test to verify the performance penalty.