Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2bd4b46
修复hbase reader task 字段值为null的bug
Sep 9, 2020
a01f3b2
Update mysqlwriter.md
orrrpuse Dec 17, 2022
cbb8196
Merge branch 'alibaba:master' into mysqlwriter_doc_update
orrrpuse Mar 23, 2023
5c040c2
Update mysqlwriter.md
orrrpuse Mar 23, 2023
5d4b55a
update:补充adbpgwriter组件doc,column参数说明
Mar 31, 2023
9d5a82b
Update README.md
jiaoyang3 Oct 12, 2023
b0224dc
[BugFix] Fix Comfiguration类中checkPath方法未生效的问题
saligia-tju Jun 26, 2024
0ec7677
add obhbase reader and writer plugin by cjyyz64
TrafalgarLuo Aug 20, 2024
de8498a
Merge pull request #1954 from jiaoyang3/patch-1
TrafalgarLuo Aug 20, 2024
f5314d5
Merge pull request #1741 from chlin-enginner/updateDoc/adbPgWriterDoc
TrafalgarLuo Aug 20, 2024
9fe9f9f
Merge pull request #1641 from orrrpuse/mysqlwriter_doc_update
TrafalgarLuo Aug 20, 2024
d9f5477
Merge pull request #2144 from saligia-tju/master
TrafalgarLuo Aug 20, 2024
3614c26
fix obhbase reader and writer package error
TrafalgarLuo Aug 20, 2024
269f973
[bugfix]修复gaussdbreader和gaussdbwriter打包
yanghaiming01 Aug 23, 2024
7bb69f6
gen default parquet schema if config not set
Oct 25, 2024
5ac1e62
fix timestamp type write after data cannot be viewed
Arlowen Nov 26, 2024
906bf3d
add milvus writer plugin
nianliuu Nov 27, 2024
b3c1c07
to #62452156 feat:add milvus writer
Feb 8, 2025
14068ce
to #62452156 feat:add milvus writer remove stream2milvus.json
Feb 8, 2025
14c81a9
to #62452156 feat:add milvus writer remove milvusjob.json
Feb 8, 2025
7940b4f
Merge pull request #2269 from ziming-ai/add-milvus-writer
dingxiaobo Feb 8, 2025
0c8a726
fix:The defects of the OceanBase plug-in are fixed
xxsc0529 Feb 13, 2025
b612516
Merge pull request #2271 from xxsc0529/master
dingxiaobo Feb 18, 2025
ee487b0
Merge pull request #2245 from mylemons/master
LitteCandy0511 Mar 28, 2025
cbb098f
Merge pull request #821 from lawlessluo/master
LitteCandy0511 Mar 28, 2025
129924c
Merge pull request #2232 from 7owen/master
LitteCandy0511 Mar 31, 2025
8478f7a
feature:oceanbase plugin add direct path support
Apr 3, 2025
91e599b
feature:oceanbase plugin add direct path support
Apr 3, 2025
f95319a
feature:add oceanbase doc
Apr 3, 2025
d82a5ae
Merge pull request #2283 from xxsc0529/master
dingxiaobo Apr 3, 2025
d40ef26
fix:direct path date type processing
Apr 10, 2025
0824b45
Merge pull request #2286 from xxsc0529/master
dingxiaobo Apr 10, 2025
8e88961
Merge branch 'master' into master
saligia-tju Apr 22, 2025
37915a9
fix:oceanbase datasource support special characters
xxsc0529 May 7, 2025
1bc342e
Merge remote-tracking branch 'origin/master'
xxsc0529 May 7, 2025
947e441
Merge pull request #2292 from xxsc0529/master
dingxiaobo May 7, 2025
c1e34c9
fix:oceanbase datasource support special characters
xxsc0529 May 27, 2025
18cf572
Merge pull request #2302 from xxsc0529/master
dingxiaobo May 27, 2025
4554981
fix:solve the problem of increasing or losing data in incremental sit…
xxsc0529 Jun 20, 2025
452fc91
Merge branch 'alibaba:master' into master
xxsc0529 Jun 20, 2025
1f850d3
fix:solve the problem of increasing or losing data in incremental sit…
xxsc0529 Jun 20, 2025
2c1c527
Merge remote-tracking branch 'origin/master'
xxsc0529 Jun 20, 2025
c5f37f0
Merge pull request #2312 from xxsc0529/master
dingxiaobo Jun 20, 2025
60ea07b
Merge pull request #2194 from saligia-tju/master
LitteCandy0511 Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |
| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) |
| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) |
| | OceanBase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oceanbasev10reader/doc/oceanbasev10reader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oceanbasev10writer/doc/oceanbasev10writer.md) |
| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) |
| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) |
| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
Expand Down Expand Up @@ -108,7 +108,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N

# 重要版本更新说明

DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下
DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容如下

- [datax_v202309](https://github.com/alibaba/DataX/releases/tag/datax_v202309)
- 支持Phoenix 同步数据添加 where条件
Expand Down
3 changes: 2 additions & 1 deletion adbpgwriter/src/main/doc/adbpgwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ COPY命令将数据写入ADB PG数据库中。

注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
2、此处 column 不能配置任何常量值
3、大写字段名,此处配置时,不需要拼接转义符号:\"

* 必选:是 <br />

Expand Down Expand Up @@ -229,4 +230,4 @@ create table schematest.test_datax (

#### 4.2.2 性能测试小结
1. `channel数对性能影响很大`
2. `通常不建议写入数据库时,通道个数 > 32`
2. `通常不建议写入数据库时,通道个数 > 32`
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Adb4pgClientProxy(Configuration configuration,TaskPluginCollector taskPl
int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
databaseConfig.setRetryIntervalTime(retryIntervalTime);

// 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置
// 设置自动提交的SQL长度(单位Byte),默认为10MB,一般不建议设置
int commitSize = configuration.getInt("commitSize", 10 * 1024 * 1024);
databaseConfig.setCommitSize(commitSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ private void checkPath(final String path) {
"系统编程错误, 该异常代表系统编程错误, 请联系DataX开发团队!.");
}

for (final String each : StringUtils.split(".")) {
for (final String each : StringUtils.split(path, ".")) {
if (StringUtils.isBlank(each)) {
throw new IllegalArgumentException(String.format(
"系统编程错误, 路径[%s]不合法, 路径层次之间不能出现空白字符 .", path));
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/job/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,40 @@
"job": {
"setting": {
"speed": {
"channel":1
"channel": 2
},
"errorLimit": {
"record": 0,
"percentage": 0.02
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"value": 1724154616370,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"value": "2024-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"value": "TestRawData",
"type": "bytes"
}
],
"sliceRecordCount": 100000
"sliceRecordCount": 100
}
},
"writer": {
Expand All @@ -49,4 +48,4 @@
}
]
}
}
}
4 changes: 1 addition & 3 deletions doriswriter/doc/doriswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
"name": "doriswriter",
"parameter": {
"loadUrl": ["172.16.0.13:8030"],
"loadProps": {
},
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"username": "root",
"password": "xxxxxx",
Expand Down Expand Up @@ -178,4 +176,4 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
}
```

更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
77 changes: 1 addition & 76 deletions elasticsearchwriter/doc/elasticsearchwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,79 +167,4 @@
* dynamic
* 描述: 不使用datax的mappings,使用es自己的自动mappings
* 必选: 否
* 默认值: false



## 4 性能报告

### 4.1 环境准备

* 总数据量 1kw条数据, 每条0.1kb
* 1个shard, 0个replica
* 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升

#### 4.1.1 输入数据类型(streamreader)

```
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},
```

#### 4.1.2 输出数据类型(eswriter)

```
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}
```

#### 4.1.2 机器参数

1. cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
2. mem: 128G
3. net: 千兆双网卡

#### 4.1.3 DataX jvm 参数

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

### 4.2 测试报告

| 通道数| 批量提交行数| DataX速度(Rec/s)|DataX流量(MB/s)|
|--------|--------| --------|--------|
| 4| 256| 11013| 0.828|
| 4| 1024| 19417| 1.43|
| 4| 4096| 23923| 1.76|
| 4| 8172| 24449| 1.80|
| 8| 256| 21459| 1.58|
| 8| 1024| 37037| 2.72|
| 8| 4096| 45454| 3.34|
| 8| 8172| 45871| 3.37|
| 16| 1024| 67567| 4.96|
| 16| 4096| 78125| 5.74|
| 16| 8172| 77519| 5.69|
| 32| 1024| 94339| 6.93|
| 32| 4096| 96153| 7.06|
| 64| 1024| 91743| 6.74|

### 4.3 测试总结

* 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
* 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展

## 5 约束限制

* 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
* 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)
* 默认值: false
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.reader.gaussdbwriter;
package com.alibaba.datax.plugin.writer.gaussdbwriter;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) {
column = new LongColumn((Integer) value);
break;
case Types.TINYINT:
column = new LongColumn(((Byte) value).longValue());
Byte aByte = (Byte) value;
column = new LongColumn(null == aByte ? null : aByte.longValue());
break;
case Types.SMALLINT:
column = new LongColumn(((Short) value).longValue());
Short aShort = (Short) value;
column = new LongColumn(null == aShort ? null : aShort.longValue());
break;
case Types.BIGINT:
column = new LongColumn((Long) value);
break;
case Types.FLOAT:
column = new DoubleColumn((Float.valueOf(value.toString())));
column = new DoubleColumn(null == value ? null : (Float.valueOf(value.toString())));
break;
case Types.DECIMAL:
column = new DoubleColumn((BigDecimal)value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import parquet.schema.*;

import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;

Expand Down Expand Up @@ -440,7 +441,7 @@ public List<ObjectInspector> getColumnTypeInspectors(List<Configuration> column
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Double.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case TIMESTAMP:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Timestamp.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(org.apache.hadoop.hive.common.type.Timestamp.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case DATE:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Date.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
Expand Down Expand Up @@ -533,7 +534,13 @@ public static MutablePair<List<Object>, Boolean> transportOneRecord(
recordList.add(new java.sql.Date(column.asDate().getTime()));
break;
case TIMESTAMP:
recordList.add(new java.sql.Timestamp(column.asDate().getTime()));
Date date = column.asDate();
if (date == null) {
recordList.add(null);
} else {
Timestamp ts = new Timestamp(date.getTime());
recordList.add(org.apache.hadoop.hive.common.type.Timestamp.ofEpochMilli(ts.getTime(), ts.getNanos()));
}
break;
default:
throw DataXException
Expand Down Expand Up @@ -630,7 +637,14 @@ public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration con
MessageType messageType = null;
ParquetFileProccessor proccessor = null;
Path outputPath = new Path(fileName);
String schema = config.getString(Key.PARQUET_SCHEMA);
String schema = config.getString(Key.PARQUET_SCHEMA, null);
if (schema == null) {
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException("parquetSchema or column can't be blank!");
}
schema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns);
}
try {
messageType = MessageTypeParser.parseMessageType(schema);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public List<Configuration> split(int mandatoryNumber) {
String endFullFileName = null;

fileSuffix = UUID.randomUUID().toString().replace('-', '_');
if (fileType.equalsIgnoreCase("PARQUET")) {
if (StringUtils.isNotBlank(this.compress)) {
fileSuffix += "." + this.compress.toLowerCase();
}
fileSuffix += ".parquet";
}

fullFileName = String.format("%s%s%s__%s", defaultFS, storePath, filePrefix, fileSuffix);
endFullFileName = String.format("%s%s%s__%s", defaultFS, endStorePath, filePrefix, fileSuffix);
Expand Down
Loading