Skip to content

Flink: add more sink shuffling support  #6303

Closed as not planned
Closed as not planned
@stevenzwu

Description

@stevenzwu

Feature Request / Improvement

Today, Flink Iceberg sink only supports simple keyBy hash distribution on partition columns. In practice, keyBy shuffle on partition values doesn't work very well.

We can make the following shuffling enhancements in Flink streaming writer. More details can be found in the design doc. This is an uber issue for tracking purpose. Here are the rough phases.

  1. [hash distribution] custom partitioner on bucket values. PR 4228 demonstrated that keyBy on low-cardinality partitioning buckets resulted in skewed traffic distribution. Flink sink can add a custom partitioner that directly map the bucket value (integer) to the downstream writer tasks (integer) in round-robin fashion (mod). This is a relatively simple case.

This is a case when write.distribution-mode=hash and there is a bucketing partition column. Other partition columns (like hourly partition) will be ignored regarding shuffling. The assumption is that bucketing column is where we want to distribute/cluster the rows.

  1. [hash distribution] bin packing based on traffic distribution statistics. This works well for skewed data on partition columns (like event time). This requires calculating traffic distribution statistics across partition columns and use the statistics to guide shuffling decision.

This is a case when write.distribution-mode=hash and there is NO bucketing partition column.

  1. [range distribution] range partition based on traffic distribution statistics. It is a variant of 2 above. This works well for "sorting" non-partition columns (e.g. country code, event type). It can improve data clustering by creating data files with narrow value ranges. Note that Flink streaming writer probably won't sort rows within a file, as that would be very expensive (not impossible). Even without rows sorted within a file, the improved data clustering can result in effective file pruning. We just can't get the additional benefits of row group level skipping (for Parquet) with rows sorted within a file.

This is a case when write.distribution-mode=range and SortOrder is defined for non-partition columns. partition columns will be ignored for range shuffling as the assumption is that non-partition sort columns are what matter here.

  1. [high cardinality columns] 2 and 3 above are mostly for low-cardinality columns (e.g. unique values in hundreds), where a simple dictionary of count per value can be used to track traffic distribution statistics. For high-cardinality column (like device or user id), we would need to use probabilistic data sketches algorithm to calculate traffic distribution.

Query engine

Flink

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions