Skip to content

Conversation

@xiaochen-zhou
Copy link
Contributor

@xiaochen-zhou xiaochen-zhou commented Nov 23, 2025

Purpose of this pull request

Supports automatic parallelism inference for sources that implement the SupportParallelismInference interface (e.g., Paimon connector). When enabled, the engine will automatically determine the optimal parallelism based on data characteristics instead of using default parallelism value: 1

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Add new tests

Check list

Comment on lines +211 to +234
**服务器级别配置示例**

在 `seatunnel.yaml` 中配置:

```yaml
seatunnel:
engine:
parallelism-inference:
enabled: true
max-parallelism: 100
```

**作业级别配置示例**

在作业配置文件的 `env` 块中配置:

```hocon
env {
# 启用并行度推断
parallelism.inference.enabled = true
# 设置最大并行度
parallelism.inference.max-parallelism = 50
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some priority notes

Comment on lines 187 to 211
@Override
public int inferParallelism() {
try {
for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) {
String tableKey = entry.getKey();
ReadBuilder readBuilder = entry.getValue();
try {
List<PartitionEntry> partitionEntries =
readBuilder.newScan().listPartitionEntries();
return !partitionEntries.isEmpty() ? partitionEntries.size() : 1;
} catch (Exception e) {
log.warn(
"Failed to get partition info for table {}, skipping parallelism inference",
tableKey,
e);
return -1;
}
}

} catch (Exception e) {
log.warn("Failed to infer parallelism for Paimon source", e);
return -1;
}
return 1;
}
Copy link
Member

@zhangshenghang zhangshenghang Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only take the first table? Could you explain the design?

Comment on lines 126 to 128
"Enable automatic parallelism inference based on data volume. "
+ "When enabled, operators with parallelism=-1 will have their parallelism "
+ "automatically determined based on the size of consumed data.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The described logic has not been implemented

Comment on lines 135 to 136
"The maximum parallelism for operators when using automatic inference. "
+ "Must be a power of 2 to ensure even distribution of subpartitions.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.orElse(parallelismInferenceConfig.isEnabled());

if (inferenceEnabled && source instanceof SupportParallelismInference) {
int inferredParallelism = ((SupportParallelismInference) source).inferParallelism();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we roll back the parallelism to the default if it has issues?

public static Option<Integer> PARALLELISM_INFERENCE_MAX_PARALLELISM =
Options.key("parallelism.inference.max-parallelism")
.intType()
.defaultValue(128)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it 128 here? The document describes it as 64.


// ==================== Parallelism Inference Options ====================

public static Option<Boolean> PARALLELISM_INFERENCE_ENABLED =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New parameters need to be added in EnvOptionRule

@xiaochen-zhou
Copy link
Contributor Author

I have updated the code based on the review suggestions. Please take a look when you have time. @zhangshenghang

Comment on lines +188 to +212
public int inferParallelism() {
int inferParallelism = 0;
try {
for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) {
String tableKey = entry.getKey();
ReadBuilder readBuilder = entry.getValue();
try {
List<PartitionEntry> partitionEntries =
readBuilder.newScan().listPartitionEntries();
inferParallelism += partitionEntries.size();
} catch (Exception e) {
log.warn(
"Failed to get partition info for table {}, skipping parallelism inference",
tableKey,
e);
return -1;
}
}

} catch (Exception e) {
log.warn("Failed to infer parallelism for Paimon source", e);
return -1;
}
return inferParallelism <= 0 ? 1 : inferParallelism;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a problem with this design? Is there any recommended calculation rule for the parallelism of our Sink under normal circumstances? @Hisoka-X

public class ParallelismInferenceConfig implements Serializable {
private static final long serialVersionUID = 1L;
private boolean enabled = false;
private int maxParallelism = 128;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be consistent with the above here? It should be 64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be consistent with the above here? It should be 64

Done.

Comment on lines 134 to 135
// Expected: 2 (table1) + 3 (table2) + 1 (table3) = 7
Assertions.assertEquals(6, parallelism);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be consistent

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants