-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Zeta] Support Parallelism Inference #10102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
| **服务器级别配置示例** | ||
|
|
||
| 在 `seatunnel.yaml` 中配置: | ||
|
|
||
| ```yaml | ||
| seatunnel: | ||
| engine: | ||
| parallelism-inference: | ||
| enabled: true | ||
| max-parallelism: 100 | ||
| ``` | ||
|
|
||
| **作业级别配置示例** | ||
|
|
||
| 在作业配置文件的 `env` 块中配置: | ||
|
|
||
| ```hocon | ||
| env { | ||
| # 启用并行度推断 | ||
| parallelism.inference.enabled = true | ||
| # 设置最大并行度 | ||
| parallelism.inference.max-parallelism = 50 | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some priority notes
| @Override | ||
| public int inferParallelism() { | ||
| try { | ||
| for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) { | ||
| String tableKey = entry.getKey(); | ||
| ReadBuilder readBuilder = entry.getValue(); | ||
| try { | ||
| List<PartitionEntry> partitionEntries = | ||
| readBuilder.newScan().listPartitionEntries(); | ||
| return !partitionEntries.isEmpty() ? partitionEntries.size() : 1; | ||
| } catch (Exception e) { | ||
| log.warn( | ||
| "Failed to get partition info for table {}, skipping parallelism inference", | ||
| tableKey, | ||
| e); | ||
| return -1; | ||
| } | ||
| } | ||
|
|
||
| } catch (Exception e) { | ||
| log.warn("Failed to infer parallelism for Paimon source", e); | ||
| return -1; | ||
| } | ||
| return 1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only take the first table? Could you explain the design?
| "Enable automatic parallelism inference based on data volume. " | ||
| + "When enabled, operators with parallelism=-1 will have their parallelism " | ||
| + "automatically determined based on the size of consumed data."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The described logic has not been implemented
| "The maximum parallelism for operators when using automatic inference. " | ||
| + "Must be a power of 2 to ensure even distribution of subpartitions."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| .orElse(parallelismInferenceConfig.isEnabled()); | ||
|
|
||
| if (inferenceEnabled && source instanceof SupportParallelismInference) { | ||
| int inferredParallelism = ((SupportParallelismInference) source).inferParallelism(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we roll back the parallelism to the default if it has issues?
| public static Option<Integer> PARALLELISM_INFERENCE_MAX_PARALLELISM = | ||
| Options.key("parallelism.inference.max-parallelism") | ||
| .intType() | ||
| .defaultValue(128) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it 128 here? The document describes it as 64.
|
|
||
| // ==================== Parallelism Inference Options ==================== | ||
|
|
||
| public static Option<Boolean> PARALLELISM_INFERENCE_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New parameters need to be added in EnvOptionRule
|
I have updated the code based on the review suggestions. Please take a look when you have time. @zhangshenghang |
| public int inferParallelism() { | ||
| int inferParallelism = 0; | ||
| try { | ||
| for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) { | ||
| String tableKey = entry.getKey(); | ||
| ReadBuilder readBuilder = entry.getValue(); | ||
| try { | ||
| List<PartitionEntry> partitionEntries = | ||
| readBuilder.newScan().listPartitionEntries(); | ||
| inferParallelism += partitionEntries.size(); | ||
| } catch (Exception e) { | ||
| log.warn( | ||
| "Failed to get partition info for table {}, skipping parallelism inference", | ||
| tableKey, | ||
| e); | ||
| return -1; | ||
| } | ||
| } | ||
|
|
||
| } catch (Exception e) { | ||
| log.warn("Failed to infer parallelism for Paimon source", e); | ||
| return -1; | ||
| } | ||
| return inferParallelism <= 0 ? 1 : inferParallelism; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a problem with this design? Is there any recommended calculation rule for the parallelism of our Sink under normal circumstances? @Hisoka-X
| public class ParallelismInferenceConfig implements Serializable { | ||
| private static final long serialVersionUID = 1L; | ||
| private boolean enabled = false; | ||
| private int maxParallelism = 128; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be consistent with the above here? It should be 64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be consistent with the above here? It should be 64
Done.
| // Expected: 2 (table1) + 3 (table2) + 1 (table3) = 7 | ||
| Assertions.assertEquals(6, parallelism); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please be consistent
Purpose of this pull request
Supports automatic parallelism inference for sources that implement the
SupportParallelismInferenceinterface (e.g., Paimon connector). When enabled, the engine will automatically determine the optimal parallelism based on data characteristics instead of using default parallelism value: 1Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
Add new tests
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.