Skip to content

Commit

Permalink
kuduwriter_branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-KIDBK committed Sep 28, 2020
1 parent d6daf9c commit 098eb9f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 164 deletions.
143 changes: 0 additions & 143 deletions kuduwriter/src/main/doc/kuduwriter.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static KuduClient getKuduClient(String kuduConfig) {
return kuduClient;
}

public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration configuration, KuduClient kuduClient) {
public static KuduTable getKuduTable(Configuration configuration, KuduClient kuduClient) {
String tableName = configuration.getString(Key.TABLE);

KuduTable table = null;
Expand Down Expand Up @@ -89,7 +89,7 @@ public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration
return table;
}

public static void createTable(com.alibaba.datax.common.util.Configuration configuration) {
public static void createTable(Configuration configuration) {
String tableName = configuration.getString(Key.TABLE);
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
Expand Down Expand Up @@ -135,7 +135,7 @@ public static void createTable(com.alibaba.datax.common.util.Configuration confi
}
}

public static boolean isTableExists(com.alibaba.datax.common.util.Configuration configuration) {
public static boolean isTableExists(Configuration configuration) {
String tableName = configuration.getString(Key.TABLE);
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
Expand All @@ -160,7 +160,7 @@ public static void closeClient(KuduClient kuduClient) {

}

public static Schema getSchema(com.alibaba.datax.common.util.Configuration configuration) {
public static Schema getSchema(Configuration configuration) {
List<Configuration> columns = configuration.getListConfiguration(Key.COLUMN);
List<ColumnSchema> columnSchemas = new ArrayList<>();
Schema schema = null;
Expand Down Expand Up @@ -206,7 +206,7 @@ public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns){
return i;
}

public static void setTablePartition(com.alibaba.datax.common.util.Configuration configuration,
public static void setTablePartition(Configuration configuration,
CreateTableOptions tableOptions,
Schema schema) {
Configuration partition = configuration.getConfiguration(Key.PARTITION);
Expand Down Expand Up @@ -243,7 +243,7 @@ public static void setTablePartition(com.alibaba.datax.common.util.Configuration
}
}

public static void validateParameter(com.alibaba.datax.common.util.Configuration configuration) {
public static void validateParameter(Configuration configuration) {
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
Expand All @@ -268,6 +268,7 @@ public static void validateParameter(com.alibaba.datax.common.util.Configuration

Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
configuration.set(Key.SKIP_FAIL, isSkipFail);
LOG.info("==validate parameter complete!");
}

public static void truncateTable(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public enum Kudu11xWriterErrorcode implements ErrorCode {
}
@Override
public String getCode() {
return null;
return code;
}

@Override
public String getDescription() {
return null;
return description;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -37,13 +40,14 @@ public class KuduWriterTask {
private Integer primaryKeyIndexUntil;


public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) {
public KuduWriterTask(Configuration configuration) {
this.columns = configuration.getListConfiguration(Key.COLUMN);
this.encoding = configuration.getString(Key.ENCODING);
this.insertMode = configuration.getString(Key.INSERT_MODE);
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
this.isSkipFail = configuration.getBool(Key.SKIP_FAIL);

this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
Expand All @@ -55,6 +59,7 @@ public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration)
}

public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
LOG.info("==kuduwriter began to write!");
Record record;
AtomicLong counter = new AtomicLong(0L);
try {
Expand Down Expand Up @@ -146,7 +151,8 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
}
}
} catch (Exception e) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e);
LOG.error("write failed! the task will exit!");
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
}
AtomicInteger i = new AtomicInteger(10);
try {
Expand All @@ -170,6 +176,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
try {
session.flush();
} catch (KuduException e) {
LOG.error("==kuduwriter flush error! the results may not be complete!");
e.printStackTrace();
}
}
Expand Down
39 changes: 28 additions & 11 deletions kuduwriter/src/test/java/com/dai/test.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
package com.dai;

import com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xHelper;
import org.junit.Test;
import com.q1.datax.plugin.writer.kudu11xwriter.ColumnType;
import com.q1.datax.plugin.writer.kudu11xwriter.InsertModeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.RetryUtil;
import com.q1.datax.plugin.writer.kudu11xwriter.*;
import static org.apache.kudu.client.AsyncKuduClient.LOG;

/**
* @author daizihao
* @create 2020-08-28 11:03
**/
public class test {
@Test
public void kuduTypeTest() {
static boolean isSkipFail;


public static void main(String[] args) {
try {
while (true) {
try {
RetryUtil.executeWithRetry(()->{
throw new RuntimeException();
},5,1000L,true);

} catch (Exception e) {
LOG.error("Data write failed!", e);
System.out.println(isSkipFail);
if (isSkipFail) {
LOG.warn("Because you have configured skipFail is true,this data will be skipped!");
}else {
System.out.println("异常抛出");
throw e;
}
}
}
} catch (Exception e) {
LOG.error("write failed! the task will exit!");
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e);
}
}
}

0 comments on commit 098eb9f

Please sign in to comment.