Skip to content

[FLINK-38073][docs] Add documentation for the MultiJoin operator #26775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2025

Conversation

gustavodemorais
Copy link
Contributor

What is the purpose of the change

Add documentation on what the new operator is, when to use it and how.

Brief change log

  • Add documentation on what the new operator is, when to use and how.

Verifying this change

Check the generated documentation.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 9, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@lsyldliu
Copy link
Contributor

@gustavodemorais Can we merge this pr in next week, otherwise, it will affect the Flink 2.1 release.


### MultiJoin Operator Example - Benchmark

Here's a 10-way benchmark between the default binary joins and the MultiJoin operator. You can observe the amount of intermediate state in the first section, the amount of records processed when the operators reach 100% busyness in the second section, and the checkpoints in the third.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the bench mark code in open source, If so we should point to it here in the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. These are the initial local benchmarks. Ideally we'll have something in https://github.com/apache/flink-benchmarks and then we'll link it here.


{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}}

For this 10-way join above, involving record amplification, we've observed significant improvements. Here are some rough numbers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest mentioning what environment these numbers were seen on, CPUs, RAM, storage etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most relevant are the specs of the TaskManager. Nonetheless, I've complemented the doc with all infos

- Performance: 2x to over 100x+ increase in processed records when both at 100% busyness.
- State Size: 3x to over 1000x+ smaller as intermediate state grows.

The total state is always smaller with the MultiJoin operator. In this case, the performance is initially the same, but as the intermediate state grows, the performance of binary joins degrade and the multi join remains stable and outperforms.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether another consideration/benefit might be around recovery, previously there was intermediate state which could be recovered , but now there is none, so there is much less state to recover. Would it be true to say we could have faster recovery for the cases where MultiJoin is most useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is one of the benefits of using multi-way joins.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 11, 2025
@lsyldliu
Copy link
Contributor

cc @raminqaf, Can you help push this pr forward, we need to merge it this week, otherwise, it will affect the Flink 2.1 release.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 14, 2025
@gustavodemorais
Copy link
Contributor Author

I've answered the questions and complemented the doc. I'm currently out of office. Thanks for picking it up @raminqaf

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 15, 2025
Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gustavodemorais @raminqaf Thanks for your contribution, I left some comments.

@@ -83,6 +83,9 @@ FULL OUTER JOIN Product
ON Orders.product_id = Product.id
```

### Multiple Regular Joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiway Regular Joins? What do you think about this title?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refers to many chained (normal) regular joins that are supported in Flink. The Multiway Regular Joins is not applicable in this context.


{{< label Streaming >}}

Streaming Flink jobs with multiple non-temporal regular joins often experience operational instability and performance degradation due to large state sizes. This is often because the intermediate state created by a chain of joins is much larger than the input state itself. In Flink 2.1, we introduce a new multi-join operator, an optimization designed to significantly reduce state size and improve performance for join pipelines that involve record amplification and large intermediate state. This new operator eliminates the need to store intermediate state for joins across multiple tables by processing joins across various input streams simultaneously. This "zero intermediate state" approach primarily targets state reduction, offering substantial benefits in resource consumption and operational stability.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better we could use the unified keyword multiway join instead of MultiJoin in this docs, what do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "multi-join (MultiJoin)" combined with "operator" is well understood in the docs. Moreover, the system option can be set as

SET 'table.optimizer.multi-join.enabled' = 'true';

Which uses multi-join and keeps the consistency.

In most joins, a significant portion of processing time is spent fetching records from the state. The efficiency of the MultiJoin operator largely depends on the size of this intermediate state. In a common scenario where a pipeline experiences record amplification—meaning each join produces more data and records than the previous one, the MultiJoin operator is more efficient. This is because it keeps the state on which the operator interacts much smaller, leading to a more stable operator. If a chain of joins actually produces less state than the original records, the MultiJoin operator will still use less state overall. However, in this specific case, binary joins might perform better because the state that the final joins need to operate on is smaller.

### The MultiJoin Operator
The main benefits of the MultiJoin operator are:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, multiway join?

@@ -302,3 +302,69 @@ The execution of mini-batch join operator are as shown in the figure below.
MiniBatch optimization is disabled by default for regular join. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details.

{{< top >}}

## Multiple Regular Joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 17, 2025
@lsyldliu lsyldliu merged commit a7aabb8 into apache:master Jul 17, 2025
lsyldliu pushed a commit that referenced this pull request Jul 17, 2025
)

* [FLINK-38073][docs] Add documentation for the MultiJoin operator

* [FLINK-38073][docs] Complement with TaskManager/JobManager/Host specs

(cherry picked from commit a7aabb8)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants