-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-38073][docs] Add documentation for the MultiJoin operator #26775
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -302,3 +302,69 @@ The execution of mini-batch join operator are as shown in the figure below. | |
MiniBatch optimization is disabled by default for regular join. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. | ||
|
||
{{< top >}} | ||
|
||
## Multiple Regular Joins | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
{{< label Streaming >}} | ||
|
||
Streaming Flink jobs with multiple non-temporal regular joins often experience operational instability and performance degradation due to large state sizes. This is often because the intermediate state created by a chain of joins is much larger than the input state itself. In Flink 2.1, we introduce a new multi-join operator, an optimization designed to significantly reduce state size and improve performance for join pipelines that involve record amplification and large intermediate state. This new operator eliminates the need to store intermediate state for joins across multiple tables by processing joins across various input streams simultaneously. This "zero intermediate state" approach primarily targets state reduction, offering substantial benefits in resource consumption and operational stability. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better we could use the unified keyword There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the "multi-join (MultiJoin)" combined with "operator" is well understood in the docs. Moreover, the system option can be set as
Which uses |
||
|
||
In most joins, a significant portion of processing time is spent fetching records from the state. The efficiency of the MultiJoin operator largely depends on the size of this intermediate state. In a common scenario where a pipeline experiences record amplification—meaning each join produces more data and records than the previous one, the MultiJoin operator is more efficient. This is because it keeps the state on which the operator interacts much smaller, leading to a more stable operator. If a chain of joins actually produces less state than the original records, the MultiJoin operator will still use less state overall. However, in this specific case, binary joins might perform better because the state that the final joins need to operate on is smaller. | ||
|
||
### The MultiJoin Operator | ||
The main benefits of the MultiJoin operator are: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto, multiway join? |
||
|
||
1) Considerably smaller state size due to zero intermediate state. | ||
2) Improved performance for chained joins with record amplification. | ||
3) Improved stability: linear state growth with amount of records processed, instead of polynomial growth with binary joins. | ||
|
||
Also, pipelines with MultiJoin instead of binary joins usually have faster initialization and recovery times due to smaller state and fewer amount of nodes. | ||
|
||
### When to enable the MultiJoin? | ||
|
||
If your job has multiple joins that share at least one common join key, and you observe that the intermediate state in the intermediate joins is larger than the inputs sources, consider enabling the MultiJoin operator. | ||
|
||
### How to enable the MultiJoin? | ||
|
||
To enable this optimization, set the following configuration | ||
|
||
```sql | ||
SET 'table.optimizer.multi-join.enabled' = 'true'; | ||
``` | ||
|
||
Important: This is currently in an experimental state - there are open optimizations and breaking changes might be implemented in this version. We currently support only streaming INNER/LEFT joins. Support for RIGHT joins will be added soon. Due to records partitioning, you need at least one key that is shared between the join conditions, see: | ||
|
||
- Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by key) | ||
- Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by key via transitivity) | ||
- Not supported: A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2 (No single key allows partitioning A, B, and C together in a single operator. This will be split into multiple MultiJoin operators) | ||
|
||
### MultiJoin Operator Example - Benchmark | ||
|
||
Here's a 10-way benchmark between the default binary joins and the MultiJoin operator. You can observe the amount of intermediate state in the first section, the amount of records processed when the operators reach 100% busyness in the second section, and the checkpoints in the third. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the bench mark code in open source, If so we should point to it here in the docs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not yet. These are the initial local benchmarks. Ideally we'll have something in https://github.com/apache/flink-benchmarks and then we'll link it here. |
||
|
||
{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}} | ||
|
||
For this 10-way join above, involving record amplification, we've observed significant improvements. Here are some rough numbers: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest mentioning what environment these numbers were seen on, CPUs, RAM, storage etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most relevant are the specs of the TaskManager. Nonetheless, I've complemented the doc with all infos |
||
|
||
- Performance: 2x to over 100x+ increase in processed records when both at 100% busyness. | ||
- State Size: 3x to over 1000x+ smaller as intermediate state grows. | ||
|
||
The total state is always smaller with the MultiJoin operator. In this case, the performance is initially the same, but as the intermediate state grows, the performance of binary joins degrade and the multi join remains stable and outperforms. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder whether another consideration/benefit might be around recovery, previously there was intermediate state which could be recovered , but now there is none, so there is much less state to recover. Would it be true to say we could have faster recovery for the cases where MultiJoin is most useful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is one of the benefits of using multi-way joins. |
||
|
||
This general benchmark for the 10-way join was run with the following configuration: 10 upsert kafka topics, 10 parallelism, 1 record per second per topic. We used rocksdb with unaligned checkpoints and with incremental checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB off-heap memory and 20% network memory. The JobManager had 4GB process memory. The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink uses a blackhole connector so we only benchmark the joins. The SQL used to generate the benchmark data had this structure: | ||
|
||
```sql | ||
INSERT INTO JoinResultsMJ | ||
SELECT *all fields* | ||
FROM TenantKafka t | ||
LEFT JOIN SuppliersKafka s ON t.tenant_id = s.tenant_id AND ... | ||
LEFT JOIN ProductsKafka p ON t.tenant_id = p.tenant_id AND ... | ||
LEFT JOIN CategoriesKafka c ON t.tenant_id = c.tenant_id AND ... | ||
LEFT JOIN OrdersKafka o ON t.tenant_id = o.tenant_id AND ... | ||
LEFT JOIN CustomersKafka cust ON t.tenant_id = cust.tenant_id AND ... | ||
LEFT JOIN WarehousesKafka w ON t.tenant_id = w.tenant_id AND ... | ||
LEFT JOIN ShippingKafka sh ON t.tenant_id = sh.tenant_id AND ... | ||
LEFT JOIN PaymentKafka pay ON t.tenant_id = pay.tenant_id AND ... | ||
LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...; | ||
``` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiway Regular Joins? What do you think about this title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refers to many chained (normal) regular joins that are supported in Flink. The Multiway Regular Joins is not applicable in this context.