Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ seatunnel:
enable-http: true
port: 8080
enable-dynamic-port: false
parallelism-inference:
enabled: false
max-parallelism: 64
# Uncomment the following lines to enable basic authentication for web UI
# enable-basic-auth: true
# basic-auth-username: admin
Expand Down
67 changes: 66 additions & 1 deletion docs/en/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,72 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 Persistence Configuration of IMap (This parameter is invalid on the Worker node)
### 4.6 Parallelism Inference Configuration (This parameter is invalid on the Worker node)

SeaTunnel Engine supports automatic parallelism inference for sources that implement the `SupportParallelismInference` interface (e.g., Paimon connector). When enabled, the engine will automatically determine the optimal parallelism based on data characteristics instead of using a fixed parallelism value.

**enabled**

Whether to enable automatic parallelism inference. When enabled, sources that support parallelism inference will automatically calculate the optimal parallelism based on their data characteristics.

Default: `false`

**max-parallelism**

The maximum limit for inferred parallelism.

Default: `64`

**Server-Level Configuration Example**

Configure in `seatunnel.yaml`:

```yaml
seatunnel:
engine:
parallelism-inference:
enabled: true
max-parallelism: 100
```

**Job-Level Configuration Example**

Configure in the job config `env` block:

```hocon
env {
# Enable parallelism inference
parallelism.inference.enabled = true
# Set maximum parallelism
parallelism.inference.max-parallelism = 50
}

source {
Paimon {
warehouse = "hdfs://localhost:9000/paimon"
database = "default"
table = "users"
}
}
```

Note:

Parallelism configuration follows this priority order (from highest to lowest):

1. **`env.parallelism` at Job Level**: If set in the job's `env` block, this value will be used
2. **Parallelism Inference**: If parallelism inference is enabled and the source supports inference, the inferred parallelism will be used (limited by `max-parallelism`)

- **Job-level (env configuration) has higher priority**:
- `env.parallelism.inference.enabled` overrides `parallelism-inference.enabled` in `seatunnel.yaml`
- `env.parallelism.inference.max-parallelism` overrides `parallelism-inference.max-parallelism` in `seatunnel.yaml`
- **Server-level (seatunnel.yaml) serves as default configuration**:
- Used when not specified in job configuration
- Suitable for setting unified default behavior for all jobs

3. **Default Parallelism**: If none of the above are configured, the default value of `1` is used

### 4.7 Persistence Configuration of IMap (This parameter is invalid on the Worker node)

:::tip

Expand Down
68 changes: 67 additions & 1 deletion docs/zh/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,73 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 IMap持久化配置(该参数在Worker节点无效)
### 4.6 并行度推断配置(该参数在Worker节点无效)

SeaTunnel Engine 支持对实现了 `SupportParallelismInference` 接口的数据源(例如 Paimon 连接器)进行自动并行度推断。启用后,引擎将根据数据特征自动确定最佳并行度,而不是使用固定的并行度值。

**enabled**

是否启用自动并行度推断。启用后,支持并行度推断的数据源将根据其数据特征自动计算最佳并行度。

默认值:`false`

**max-parallelism**

并行度推断的最大限制。即使推断出的并行度更高,也会被限制在此值以内。这有助于防止资源过度使用。

默认值:`64`

**服务器级别配置示例**

在 `seatunnel.yaml` 中配置:

```yaml
seatunnel:
engine:
parallelism-inference:
enabled: true
max-parallelism: 100
```

**作业级别配置示例**

在作业配置文件的 `env` 块中配置:

```hocon
env {
# 启用并行度推断
parallelism.inference.enabled = true
# 设置最大并行度
parallelism.inference.max-parallelism = 50
}

Comment on lines +211 to +234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some priority notes

source {
Paimon {
warehouse = "hdfs://localhost:9000/paimon"
database = "default"
table = "users"
}
}
```

注意:

并行度配置遵循以下优先级顺序(从高到低):

1. **作业级别的 `env.parallelism`**:如果在作业配置的 `env` 块中设置了 `parallelism`,将使用该值
2. **并行度推断**:如果启用了并行度推断且数据源支持推断,将使用推断出的并行度(受 `max-parallelism` 限制)

- **作业级别(env 配置)优先级更高**:
- `env.parallelism.inference.enabled` 会覆盖 `seatunnel.yaml` 中的 `parallelism-inference.enabled`
- `env.parallelism.inference.max-parallelism` 会覆盖 `seatunnel.yaml` 中的 `parallelism-inference.max-parallelism`
- **服务器级别(seatunnel.yaml)作为默认配置**:
- 当作业配置中未指定时,使用服务器级别的配置
- 适合为所有作业设置统一的默认行为

3. **默认并行度**:如果以上都未配置,使用默认值 `1`


### 4.7 IMap持久化配置(该参数在Worker节点无效)

:::tip

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,19 @@ public class EnvCommonOptions {
.mapType()
.noDefaultValue()
.withDescription("Define the worker where the job runs by tag");

// ==================== Parallelism Inference Options ====================

public static Option<Boolean> PARALLELISM_INFERENCE_ENABLED =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New parameters need to be added in EnvOptionRule

Options.key("parallelism.inference.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable automatic parallelism inference.");

public static Option<Integer> PARALLELISM_INFERENCE_MAX_PARALLELISM =
Options.key("parallelism.inference.max-parallelism")
.intType()
.defaultValue(64)
.withDescription(
"The maximum parallelism for operators when using automatic inference.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public OptionRule optionRule() {
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
EnvCommonOptions.CUSTOM_PARAMETERS,
EnvCommonOptions.NODE_TAG_FILTER)
EnvCommonOptions.NODE_TAG_FILTER,
EnvCommonOptions.PARALLELISM_INFERENCE_ENABLED,
EnvCommonOptions.PARALLELISM_INFERENCE_MAX_PARALLELISM)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.source;

/**
* The Source Connectors which support automatic parallelism inference should implement this
* interface.
*/
public interface SupportParallelismInference {

/** Infer the recommended parallelism for this source. */
int inferParallelism();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.source.SupportParallelismInference;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
Expand All @@ -42,7 +43,8 @@
public class FakeSource
implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState>,
SupportParallelism,
SupportColumnProjection {
SupportColumnProjection,
SupportParallelismInference {

private JobContext jobContext;
private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig;
Expand Down Expand Up @@ -98,4 +100,16 @@ public String getPluginName() {
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}

@Override
public int inferParallelism() {
// For testing purposes, infer parallelism based on the number of tables
int tableCount = multipleTableFakeSourceConfig.getFakeConfigs().size();
int inferredParallelism = Math.max(1, tableCount * 2);
log.info(
"FakeSource inferred parallelism: {} (based on {} tables)",
inferredParallelism,
tableCount);
return inferredParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelismInference;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -38,6 +39,7 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonStreamSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
Expand All @@ -57,7 +59,8 @@
/** Paimon connector source class. */
@Slf4j
public class PaimonSource
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> {
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState>,
SupportParallelismInference {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -179,4 +182,32 @@ public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> restoreEnumer
readBuilders,
1);
}

/** Infer parallelism based on Paimon data partition count. */
@Override
public int inferParallelism() {
int inferParallelism = 0;
try {
for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) {
String tableKey = entry.getKey();
ReadBuilder readBuilder = entry.getValue();
try {
List<PartitionEntry> partitionEntries =
readBuilder.newScan().listPartitionEntries();
inferParallelism += partitionEntries.size();
} catch (Exception e) {
log.warn(
"Failed to get partition info for table {}, skipping parallelism inference",
tableKey,
e);
return -1;
}
}

} catch (Exception e) {
log.warn("Failed to infer parallelism for Paimon source", e);
return -1;
}
return inferParallelism <= 0 ? 1 : inferParallelism;
}
Comment on lines +188 to +212
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a problem with this design? Is there any recommended calculation rule for the parallelism of our Sink under normal circumstances? @Hisoka-X

}
Loading