diff --git a/kuduwriter/src/main/doc/kuduwriter.md b/kuduwriter/src/main/doc/kuduwriter.md deleted file mode 100644 index 44bef23502..0000000000 --- a/kuduwriter/src/main/doc/kuduwriter.md +++ /dev/null @@ -1,143 +0,0 @@ -# datax-kudu-plugin -datax kudu的writer插件 - - - -eg: - -```json -{ - "name": "kudu11xwriter", - "parameter": { - "kuduConfig": { - "kudu.master_addresses": "***", - "timeout": 60000, - "sessionTimeout": 60000 - - }, - "table": "", - "replicaCount": 3, - "truncate": false, - "writeMode": "upsert", - "partition": { - "range": { - "column1": [ - { - "lower": "2020-08-25", - "upper": "2020-08-26" - }, - { - "lower": "2020-08-26", - "upper": "2020-08-27" - }, - { - "lower": "2020-08-27", - "upper": "2020-08-28" - } - ] - }, - "hash": { - "column": [ - "column1" - ], - "number": 3 - } - }, - "column": [ - { - "index": 0, - "name": "c1", - "type": "string", - "primaryKey": true - }, - { - "index": 1, - "name": "c2", - "type": "string", - "compress": "DEFAULT_COMPRESSION", - "encoding": "AUTO_ENCODING", - "comment": "注解xxxx" - } - ], - "batchSize": 1024, - "bufferSize": 2048, - "skipFail": false, - "encoding": "UTF-8" - } -} -``` - -必须参数: - -```json - "writer": { - "name": "kudu11xwriter", - "parameter": { - "kuduConfig": { - "kudu.master_addresses": "***" - }, - "table": "***", - "column": [ - { - "name": "c1", - "type": "string", - "primaryKey": true - }, - { - "name": "c2", - "type": "string", - }, - { - "name": "c3", - "type": "string" - }, - { - "name": "c4", - "type": "string" - } - ] - } - } -``` - -主键列请写到最前面 - - - -![image-20200901193148188](C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200901193148188.png) - -##### 配置列表 - -| name | default | description | 是否必须 | -| -------------- | ------------------- | ------------------------------------------------------------ | -------- | -| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 | -| table | | 导入目标表名 | 是 | -| partition | | 分区 | 否 | -| column | | 列 | 是 | -| name | | 列名 | 是 | -| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 | -| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 | -| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 | -| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 | -| encoding | AUTO_ENCODING | 编码 | 否 | -| replicaCount | 3 | 保留副本个数 | 否 | -| hash | | hash分区 | 否 | -| number | 3 | hash分区个数 | 否 | -| range | | range分区 | 否 | -| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 | -| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 | -| truncate | false | 是否清空表,本质上是删表重建 | 否 | -| writeMode | upsert | upsert,insert,update | 否 | -| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 | -| bufferSize | 3072 | 缓冲区大小 | 否 | -| skipFail | false | 是否跳过插入不成功的数据 | 否 | -| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 | -| sessionTimeout | 60000 | session超时时间 单位:ms | 否 | - - - - - - - - diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java index 5dc9b8ca2a..105688201e 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java @@ -58,7 +58,7 @@ public static KuduClient getKuduClient(String kuduConfig) { return kuduClient; } - public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration configuration, KuduClient kuduClient) { + public static KuduTable getKuduTable(Configuration configuration, KuduClient kuduClient) { String tableName = configuration.getString(Key.TABLE); KuduTable table = null; @@ -89,7 +89,7 @@ public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration return table; } - public static void createTable(com.alibaba.datax.common.util.Configuration configuration) { + public static void createTable(Configuration configuration) { String tableName = configuration.getString(Key.TABLE); String kuduConfig = configuration.getString(Key.KUDU_CONFIG); KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); @@ -135,7 +135,7 @@ public static void createTable(com.alibaba.datax.common.util.Configuration confi } } - public static boolean isTableExists(com.alibaba.datax.common.util.Configuration configuration) { + public static boolean isTableExists(Configuration configuration) { String tableName = configuration.getString(Key.TABLE); String kuduConfig = configuration.getString(Key.KUDU_CONFIG); KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); @@ -160,7 +160,7 @@ public static void closeClient(KuduClient kuduClient) { } - public static Schema getSchema(com.alibaba.datax.common.util.Configuration configuration) { + public static Schema getSchema(Configuration configuration) { List columns = configuration.getListConfiguration(Key.COLUMN); List columnSchemas = new ArrayList<>(); Schema schema = null; @@ -206,7 +206,7 @@ public static Integer getPrimaryKeyIndexUntil(List columns){ return i; } - public static void setTablePartition(com.alibaba.datax.common.util.Configuration configuration, + public static void setTablePartition(Configuration configuration, CreateTableOptions tableOptions, Schema schema) { Configuration partition = configuration.getConfiguration(Key.PARTITION); @@ -243,7 +243,7 @@ public static void setTablePartition(com.alibaba.datax.common.util.Configuration } } - public static void validateParameter(com.alibaba.datax.common.util.Configuration configuration) { + public static void validateParameter(Configuration configuration) { configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE); configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE); String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); @@ -268,6 +268,7 @@ public static void validateParameter(com.alibaba.datax.common.util.Configuration Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false); configuration.set(Key.SKIP_FAIL, isSkipFail); + LOG.info("==validate parameter complete!"); } public static void truncateTable(Configuration configuration) { diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java index 694f97a29c..d46bcea31b 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java @@ -29,11 +29,11 @@ public enum Kudu11xWriterErrorcode implements ErrorCode { } @Override public String getCode() { - return null; + return code; } @Override public String getDescription() { - return null; + return description; } } diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java index e8e83896a4..127ee0c107 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -13,6 +13,9 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,13 +40,14 @@ public class KuduWriterTask { private Integer primaryKeyIndexUntil; - public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) { + public KuduWriterTask(Configuration configuration) { this.columns = configuration.getListConfiguration(Key.COLUMN); this.encoding = configuration.getString(Key.ENCODING); this.insertMode = configuration.getString(Key.INSERT_MODE); this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE); this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert"); + this.isSkipFail = configuration.getBool(Key.SKIP_FAIL); this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG)); this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient); @@ -55,6 +59,7 @@ public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) } public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { + LOG.info("==kuduwriter began to write!"); Record record; AtomicLong counter = new AtomicLong(0L); try { @@ -146,7 +151,8 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu } } } catch (Exception e) { - throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e); + LOG.error("write failed! the task will exit!"); + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); } AtomicInteger i = new AtomicInteger(10); try { @@ -170,6 +176,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu try { session.flush(); } catch (KuduException e) { + LOG.error("==kuduwriter flush error! the results may not be complete!"); e.printStackTrace(); } } diff --git a/kuduwriter/src/test/java/com/dai/test.java b/kuduwriter/src/test/java/com/dai/test.java index ba4ceecd9e..5fd17beba6 100644 --- a/kuduwriter/src/test/java/com/dai/test.java +++ b/kuduwriter/src/test/java/com/dai/test.java @@ -1,14 +1,8 @@ package com.dai; -import com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xHelper; -import org.junit.Test; -import com.q1.datax.plugin.writer.kudu11xwriter.ColumnType; -import com.q1.datax.plugin.writer.kudu11xwriter.InsertModeType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicInteger; - +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.RetryUtil; +import com.q1.datax.plugin.writer.kudu11xwriter.*; import static org.apache.kudu.client.AsyncKuduClient.LOG; /** @@ -16,8 +10,31 @@ * @create 2020-08-28 11:03 **/ public class test { - @Test - public void kuduTypeTest() { + static boolean isSkipFail; + + + public static void main(String[] args) { + try { + while (true) { + try { + RetryUtil.executeWithRetry(()->{ + throw new RuntimeException(); + },5,1000L,true); + } catch (Exception e) { + LOG.error("Data write failed!", e); + System.out.println(isSkipFail); + if (isSkipFail) { + LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); + }else { + System.out.println("异常抛出"); + throw e; + } + } + } + } catch (Exception e) { + LOG.error("write failed! the task will exit!"); + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e); + } } }