-
Notifications
You must be signed in to change notification settings - Fork 207
Description
Describe the bug
In the current Blaze integration with Uniffle, when enabling AQE (Adaptive Query Execution) and triggering RebalancePartitions operations (especially "rebalance_partitions_by_col"), the shuffle reader may incorrectly fetch duplicated shuffle data.
This issue happens because BlazeUniffleShuffleReader directly retrieves taskIdBitmap and partitionToExpectBlocks from the upstream RssShuffleReader without properly filtering based on startMapIndex and endMapIndex. In scenarios with data skew, AQE will generate multiple shuffle read tasks for the same partition, but without correct taskId filtering, all tasks may read all blocks, leading to N-times duplication of the output data.
Disabling AQE, or specifically disabling skew optimization or rebalance operations, can avoid this issue, which further confirms the root cause.
To Reproduce
Steps to reproduce the behavior:
1. Use Blaze+Uniffle integration
2. Enable `spark.sql.adaptive.enabled=true` and `spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled=true`
3. Prepare skewed table
DROP TABLE IF EXISTS blaze_db.marl_test_skew_big_source;
DROP TABLE IF EXISTS blaze_db.marl_test_skew_big_target;
CREATE TABLE blaze_db.marl_test_skew_big_source (
user_id BIGINT,
product_id STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);
CREATE TABLE blaze_db.marl_test_skew_big_target LIKE blaze_db.marl_test_skew_big_source;
INSERT INTO blaze_db.marl_test_skew_big_source
SELECT
CASE WHEN rand() < 0.8 THEN 1 ELSE cast(rand() * 1000000 as BIGINT) END as user_id,
concat('product_', cast(rand() * 10000 as STRING)) as product_id,
'20240425' as dt
FROM range(0, 20000000);
4. Query that triggers AQE Rebalance
INSERT OVERWRITE TABLE blaze_db.marl_test_skew_big_target
PARTITION (dt)
SELECT
user_id,
product_id,
dt
FROM blaze_db.marl_test_skew_big_source
WHERE dt = '20240425';
Expected behavior
Each shuffle read task should only pull its assigned partition + expected map outputs' data range, without duplication.
Actual behavior
All shuffle read tasks pull the full shuffle block data for the partition, resulting in output data duplicated N times, where N is the number of shuffle read tasks.
Screenshots
Additional context
Add any other context about the problem here.