feat: support and optimize Spark MERGE INTO #172
+360
−2
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Background
Currently, when Spark writes data into a Lance table using the Merge into syntax, it performs data shuffle with Segment_id as the shuffle key and conducts concurrent data writing.
During the join operation between the source data and the Lance target table data, the source data is split into three categories: insert data, update data and delete data. For the insert data, the segment_id field in the intermediate result dataset of the join operation is assigned a null value. This results in a shuffle operation based on null values, which shuffles all insert data into a single task and consequently causes data skew (All insert data goes into the same write task, with shuffling based on null values).
Solution
Attempt to modify the requiredDistribution() method
Then reconstruct the expression of Distributions.clustered(new NamedReference[] {segmentId}), to achieve the logic that a random number is used as the random value when the segment_id value is null.
Before

After
