Skip to content

Incorrect shuffle data fetch under AQE Rebalance when using Uniffle in Blaze causes data duplication #975

@merrily01

Description

@merrily01

Describe the bug

In the current Blaze integration with Uniffle, when enabling AQE (Adaptive Query Execution) and triggering RebalancePartitions operations (especially "rebalance_partitions_by_col"), the shuffle reader may incorrectly fetch duplicated shuffle data.

This issue happens because BlazeUniffleShuffleReader directly retrieves taskIdBitmap and partitionToExpectBlocks from the upstream RssShuffleReader without properly filtering based on startMapIndex and endMapIndex. In scenarios with data skew, AQE will generate multiple shuffle read tasks for the same partition, but without correct taskId filtering, all tasks may read all blocks, leading to N-times duplication of the output data.

Disabling AQE, or specifically disabling skew optimization or rebalance operations, can avoid this issue, which further confirms the root cause.

To Reproduce
Steps to reproduce the behavior:

1. Use Blaze+Uniffle integration
2. Enable `spark.sql.adaptive.enabled=true` and `spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled=true`
3. Prepare skewed table
DROP TABLE IF EXISTS blaze_db.marl_test_skew_big_source;

DROP TABLE IF EXISTS blaze_db.marl_test_skew_big_target;

CREATE TABLE blaze_db.marl_test_skew_big_source (
  user_id BIGINT,
  product_id STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);

CREATE TABLE blaze_db.marl_test_skew_big_target LIKE blaze_db.marl_test_skew_big_source;

INSERT INTO blaze_db.marl_test_skew_big_source
SELECT
  CASE WHEN rand() < 0.8 THEN 1 ELSE cast(rand() * 1000000 as BIGINT) END as user_id,
  concat('product_', cast(rand() * 10000 as STRING)) as product_id,
  '20240425' as dt
FROM range(0, 20000000);

4. Query that triggers AQE Rebalance
INSERT OVERWRITE TABLE blaze_db.marl_test_skew_big_target
PARTITION (dt)
SELECT
  user_id,
  product_id,
  dt
FROM blaze_db.marl_test_skew_big_source
WHERE dt = '20240425';

Expected behavior

Each shuffle read task should only pull its assigned partition + expected map outputs' data range, without duplication.

Actual behavior

All shuffle read tasks pull the full shuffle block data for the partition, resulting in output data duplicated N times, where N is the number of shuffle read tasks.

Screenshots

Image

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions