Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| field_ide | String | No | - |
| properties | Map | No | - |
| common-options | | No | - |
| server_time_zone | String | No | - |
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST |
| data_save_mode | Enum | No | APPEND_DATA |
| custom_sql | String | No | - |
Expand Down Expand Up @@ -93,6 +94,10 @@ For example, when using OceanBase database, you need to set it to 'mysql' or 'or

Postgres 9.5 version or below,please set it to `postgresLow` to support cdc

### server_time_zone [string]

The session time zone of the database server, for example: `"Asia/Shanghai"` or `"UTC"`. It controls how the `TIMESTAMP` column is converted between database and JVM when using JDBC drivers (such as MySQL). If it is not set, the driver usually falls back to the JVM default time zone or its own defaults, which may lead to hour offsets when the database server is running in a different time zone.

### dialect [string]

The appointed dialect, if it does not exist, is still obtained according to the url, and the priority is higher than the url. For example,when using starrocks, you need set it to `starrocks`. Similarly, when using mysql, you need to set its value to `mysql`.
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ supports query SQL and can achieve projection effect.
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
| split.string_split_mode | String | No | sample | Supports different string splitting algorithms. By default, `sample` is used to determine the split by sampling the string value. You can switch to `charset_based` to enable charset-based string splitting algorithm. When set to `charset_based`, the algorithm assumes characters of partition_column are within ASCII range 32-126, which covers most character-based splitting scenarios. |
| split.string_split_mode_collate | String | No | - | Specifies the collation to use when string_split_mode is set to `charset_based` and the table has a special collation. If not specified, the database's default collation will be used. |
| server_time_zone | String | No | - | The session time zone of the database server, for example: `"Asia/Shanghai"` or `"UTC"`. It controls how the `TIMESTAMP` column is converted between database and JVM when using JDBC drivers (such as MySQL). If it is not set, the driver usually falls back to the JVM default time zone or its own defaults, which may lead to hour offsets when the database server is running in a different time zone. |

### Table Matching

Expand Down
5 changes: 5 additions & 0 deletions docs/zh/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| field_ide | String | 否 | - |
| properties | Map | 否 | - |
| common-options | | 否 | - |
| server_time_zone | String | 否 | - |
| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
| data_save_mode | Enum | 否 | APPEND_DATA |
| custom_sql | String | 否 | - |
Expand Down Expand Up @@ -90,6 +91,10 @@ JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`

Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC

### server_time_zone [string]

数据库服务器的会话时区,例如:`"Asia/Shanghai"` 或 `"UTC"`。它控制在使用 JDBC 驱动程序(如 MySQL)时 `TIMESTAMP` 列在数据库和 JVM 之间的转换方式。如果未设置,驱动程序通常会回退到 JVM 默认时区或其自身默认值,当数据库服务器在不同的时区运行时,这可能会导致小时偏差。

### dialect [string]

指定的方言,如果不存在,仍然按照url获取,优先级高于url。例如,当使用 starrocks 时,你需要将其值设置为 starrocks,同理,当使用mysql时,你需要将其值设置为mysql。
Expand Down
7 changes: 7 additions & 0 deletions docs/zh/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| table_list | Array | 否 | - | 要读取的表列表,您可以使用此配置代替 `table_path` |
| where_condition | String | 否 | - | 所有表/查询的通用行过滤条件,必须以 `where` 开头。例如 `where id > 100` |
| split.size | Int | 否 | 8096 | 一个分割中有多少行,捕获的表在读取时被分成多个分割。**注意**:此参数仅在使用 `table_path` 参数时生效。使用 `query` 参数时不生效。 |
| split.even-distribution.factor.lower-bound | Double | 否 | 0.05 | 不建议使用。<br/> 块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算得出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) / 行数),表块将被优化以实现均匀分布。否则,如果分布因子小于此下限,表将被视为分布不均,如果估计的分片数量超过 `sample-sharding.threshold` 指定的值,将使用基于采样的分片策略。默认值为 0.05。 |
| split.even-distribution.factor.upper-bound | Double | 否 | 100 | 不建议使用。<br/> 块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算得出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) / 行数),表块将被优化以实现均匀分布。否则,如果分布因子大于此上限,表将被视为分布不均,如果估计的分片数量超过 `sample-sharding.threshold` 指定的值,将使用基于采样的分片策略。默认值为 100.0。 |
| split.sample-sharding.threshold | Int | 否 | 1000 | 此配置指定触发采样分片策略的估计分片数量阈值。当分布因子超出 `chunk-key.even-distribution.factor.upper-bound` 和 `chunk-key.even-distribution.factor.lower-bound` 指定的范围,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用采样分片策略。这有助于更高效地处理大型数据集。默认值为 1000 个分片。 |
| split.inverse-sampling.rate | Int | 否 | 1000 | 采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则表示在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理非常大的数据集时特别有用,此时更倾向于使用较低的采样率。默认值为 1000。 |
| split.string_split_mode | String | 否 | sample | 支持不同的字符串分割算法。默认情况下,使用 `sample` 通过对字符串值进行采样来确定分割。您可以切换到 `charset_based` 以启用基于字符集的字符串分割算法。设置为 `charset_based` 时,该算法假设 partition_column 的字符在 ASCII 范围 32-126 内,这涵盖了大多数基于字符的分割场景。 |
| split.string_split_mode_collate | String | 否 | - | 指定当 string_split_mode 设置为 `charset_based` 且表具有特殊排序规则时要使用的排序规则。如果未指定,将使用数据库的默认排序规则。 |
| server_time_zone | String | 否 | - | 数据库服务器的会话时区,例如:`"Asia/Shanghai"` 或 `"UTC"`。它控制在使用 JDBC 驱动程序(如 MySQL)时 `TIMESTAMP` 列如何在数据库和 JVM 之间转换。如果未设置,驱动程序通常会回退到 JVM 默认时区或其自身默认值,当数据库服务器在不同时区运行时,这可能导致小时偏差。 |
| common-options | | 否 | - | 源插件通用参数,请参考 [源通用选项](../source-common-options.md) 详见。 |

### 表匹配
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return convertToTime();
case TIMESTAMP:
return convertToTimestamp(serverTimeZone);
case TIMESTAMP_TZ:
return convertToTimestampTz(serverTimeZone);
case FLOAT:
return wrapNumericConverter(convertToFloat());
case DOUBLE:
Expand Down Expand Up @@ -415,6 +417,23 @@ public Object convert(Object dbzObj, Schema schema) {
};
}

private static DebeziumDeserializationConverter convertToTimestampTz(ZoneId serverTimeZone) {
DebeziumDeserializationConverter baseConverter = convertToTimestamp(serverTimeZone);
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Object value = baseConverter.convert(dbzObj, schema);
if (value == null) {
return null;
}
LocalDateTime localDateTime = (LocalDateTime) value;
return localDateTime.atZone(serverTimeZone).toOffsetDateTime();
}
};
}

@SuppressWarnings("MagicNumber")
public static LocalDateTime toLocalDateTime(long millisecond, int nanoOfMillisecond) {
// 86400000 = 24 * 60 * 60 * 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
default:
break;
}
// For CDC, we intentionally use the default MySqlTypeConverter instance which treats
// TIMESTAMP as a local "wall clock" timestamp. Timezone-aware semantics are handled by
// Debezium itself via its temporal converters and the configured server time zone.
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;

@Slf4j
public class DefaultDataConverter implements DataConverter<Object[]> {
Expand Down Expand Up @@ -101,6 +102,9 @@ public SeaTunnelRow convert(Object[] values, TiTableInfo tableInfo, SeaTunnelRow
case TIMESTAMP:
fields[fieldIndex] = convertToTimestamp(value, dataType);
break;
case TIMESTAMP_TZ:
fields[fieldIndex] = convertToTimestampTz(value, dataType);
break;
case BYTES:
fields[fieldIndex] = convertToBinary(value);
break;
Expand Down Expand Up @@ -293,6 +297,15 @@ private static Object convertToTimestamp(
return value;
}

private static Object convertToTimestampTz(
Object value, org.tikv.common.types.DataType dataType) {
LocalDateTime localDateTime = (LocalDateTime) convertToTimestamp(value, dataType);
if (localDateTime == null) {
return null;
}
return localDateTime.atZone(ZoneId.systemDefault()).toOffsetDateTime();
}

public static LocalDateTime toLocalDateTime(long millisecond, int nanoOfMillisecond) {
// 86400000 = 24 * 60 * 60 * 1000
int date = (int) (millisecond / 86400000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public BasicTypeDefine<EsType> reconvert(Column column) {
break;
case DATE:
case TIMESTAMP:
case TIMESTAMP_TZ:
Map<String, Object> option = new HashMap<>();
if (column.getScale() != null && column.getScale() > 3) {
option.put("format", "strict_date_optional_time||epoch_millis");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -86,6 +87,9 @@ private static FieldFormatter createFieldFormatter(
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
return localDateTime.toString();
case TIMESTAMP_TZ:
OffsetDateTime offsetDateTime = (OffsetDateTime) row.getField(fieldIndex);
return offsetDateTime.toString();
default:
return row.getField(fieldIndex).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static Schema convertToSchema(SeaTunnelDataType<?> dataType, String rowNa
Schema binary = SchemaBuilder.builder().bytesType();
return nullableSchema(binary);
case TIMESTAMP:
case TIMESTAMP_TZ:
// use long to represents Timestamp
LogicalType avroLogicalType;
avroLogicalType = LogicalTypes.timestampMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -162,6 +163,17 @@ public Object convert(Schema schema, Object object) {
}
};
break;
case TIMESTAMP_TZ:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Schema schema, Object object) {
return ((OffsetDateTime) object).toInstant().toEpochMilli();
}
};
break;
case DECIMAL:
converter =
new RowDataToAvroConverter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public static Type toIcebergType(SeaTunnelDataType dataType, AtomicInteger nextI
case TIME:
return Types.TimeType.get();
case TIMESTAMP:
case TIMESTAMP_TZ:
return Types.TimestampType.withZone();
case STRING:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ public class JdbcCommonOptions {
public static final Option<String> REGION =
Options.key("region").stringType().noDefaultValue().withDescription("region");

public static final Option<String> SERVER_TIME_ZONE =
Options.key("server_time_zone")
.stringType()
.noDefaultValue()
.withFallbackKeys("serverTimeZone")
.withDescription(
"The session time zone of the database server, for example: \"Asia/Shanghai\". "
+ "It controls how the TIMESTAMP column is converted to string/temporal types. "
+ "If it is not set, ZoneId.systemDefault() is used.");

public static final OptionRule.Builder BASE_CATALOG_RULE =
OptionRule.builder()
.required(URL)
Expand Down
Loading