Description
Feature Request / Improvement
Today, Flink Iceberg sink only supports simple keyBy hash distribution on partition columns. In practice, keyBy shuffle on partition values doesn't work very well.
We can make the following shuffling enhancements in Flink streaming writer. More details can be found in the design doc. This is an uber issue for tracking purpose. Here are the rough phases.
- [hash distribution] custom partitioner on bucket values. PR 4228 demonstrated that keyBy on low-cardinality partitioning buckets resulted in skewed traffic distribution. Flink sink can add a custom partitioner that directly map the bucket value (integer) to the downstream writer tasks (integer) in round-robin fashion (mod). This is a relatively simple case.
This is a case when write.distribution-mode=hash
and there is a bucketing partition column. Other partition columns (like hourly partition) will be ignored regarding shuffling. The assumption is that bucketing column is where we want to distribute/cluster the rows.
- [hash distribution] bin packing based on traffic distribution statistics. This works well for skewed data on partition columns (like event time). This requires calculating traffic distribution statistics across partition columns and use the statistics to guide shuffling decision.
This is a case when write.distribution-mode=hash
and there is NO bucketing partition column.
- [range distribution] range partition based on traffic distribution statistics. It is a variant of 2 above. This works well for "sorting" non-partition columns (e.g. country code, event type). It can improve data clustering by creating data files with narrow value ranges. Note that Flink streaming writer probably won't sort rows within a file, as that would be very expensive (not impossible). Even without rows sorted within a file, the improved data clustering can result in effective file pruning. We just can't get the additional benefits of row group level skipping (for Parquet) with rows sorted within a file.
This is a case when write.distribution-mode=range
and SortOrder
is defined for non-partition columns. partition columns will be ignored for range shuffling as the assumption is that non-partition sort columns are what matter here.
- [high cardinality columns] 2 and 3 above are mostly for low-cardinality columns (e.g. unique values in hundreds), where a simple dictionary of count per value can be used to track traffic distribution statistics. For high-cardinality column (like device or user id), we would need to use probabilistic data sketches algorithm to calculate traffic distribution.
Query engine
Flink