Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Flink and Spark connector support String type #7075

Merged
merged 23 commits into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4ee37b4
[Bug]:fix when data null , throw NullPointerException
Jul 6, 2021
0d191a6
[Bug]:Distinguish between null and empty string
Jul 6, 2021
16ece83
Merge branch 'apache:master' into master
JNSimba Jul 10, 2021
0e9d01d
Merge branch 'apache:master' into master
JNSimba Jul 15, 2021
ecda935
[Feature]:flink-connector supports streamload parameters
Jul 15, 2021
c312404
[Fix]:code style
Jul 20, 2021
cbd2144
Merge branch 'apache:master' into master
JNSimba Jul 26, 2021
4a5f7ae
Merge branch 'apache:master' into master
JNSimba Aug 5, 2021
e5bd427
Merge branch 'apache:master' into master
JNSimba Aug 11, 2021
e914c5d
Merge branch 'apache:master' into master
JNSimba Aug 12, 2021
4b85175
Merge branch 'apache:master' into master
JNSimba Aug 13, 2021
b3845a1
Merge branch 'apache:master' into master
JNSimba Aug 24, 2021
187f311
Merge branch 'apache:master' into master
JNSimba Sep 18, 2021
b39be53
Merge branch 'apache:master' into master
JNSimba Sep 23, 2021
e0eb0e8
Merge branch 'apache:master' into master
JNSimba Sep 24, 2021
787ceaf
Merge branch 'apache:master' into master
JNSimba Sep 25, 2021
34bb351
Merge branch 'apache:master' into master
JNSimba Sep 27, 2021
4661c76
Merge branch 'apache:master' into master
JNSimba Oct 21, 2021
2d50b57
Merge branch 'apache:master' into master
JNSimba Nov 1, 2021
35181f2
Merge branch 'apache:master' into master
JNSimba Nov 3, 2021
e20f90a
Merge branch 'apache:master' into master
JNSimba Nov 4, 2021
3bd7924
[Feature] flink and spark connector support String
Nov 10, 2021
fd364d3
[Fix] remove unuse thrift
Nov 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Feature]:flink-connector supports streamload parameters
  • Loading branch information
wudi committed Jul 15, 2021
commit ecda9354a6f20a5de55d823f80d82df6741d8f6f
1 change: 1 addition & 0 deletions docs/en/extending-doris/flink-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | Maximum number of lines in a single write BE |
| sink.max-retries | 1 | Number of retries after writing BE failed |
| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','. |


## Doris & Flink Column Type Mapping
Expand Down
1 change: 1 addition & 0 deletions docs/zh-CN/extending-doris/flink-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | 单次写BE的最大行数 |
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
| sink.properties.* | -- | Stream load 的导入参数。例如:sink.properties.column_separator' = ','等 |



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.time.Duration;
import java.util.Properties;

/**
* JDBC sink batch options.
Expand All @@ -32,11 +33,15 @@ public class DorisExecutionOptions implements Serializable {
private final Integer maxRetries;
private final Long batchIntervalMs;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
/** Properties for the StreamLoad. */
private final Properties streamLoadProp;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs,Properties streamLoadProp) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
this.streamLoadProp = streamLoadProp;
}

public Integer getBatchSize() {
Expand All @@ -51,6 +56,10 @@ public Long getBatchIntervalMs() {
return batchIntervalMs;
}

public Properties getStreamLoadProp() {
return streamLoadProp;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -62,6 +71,7 @@ public static class Builder {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
private Properties streamLoadProp;

public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
Expand All @@ -78,8 +88,13 @@ public Builder setBatchIntervalMs(Long batchIntervalMs) {
return this;
}

public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}

public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs,streamLoadProp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -47,14 +48,21 @@
public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
private static final String LINE_DELIMITER_DEFAULT = "\n";
private static final String NULL_VALUE = "\\N";
private final String fieldDelimiter;
private final String lineDelimiter;

private DorisOptions options ;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
private final String fieldDelimiter = "\t";
private final String lineDelimiter = "\n";
private final String NULL_VALUE = "\\N";



private final List<String> batch = new ArrayList<>();
private transient volatile boolean closed = false;

Expand All @@ -66,21 +74,23 @@ public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,LINE_DELIMITER_DEFAULT);
}

@Override
public void configure(Configuration configuration) {

}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
dorisStreamLoad = new DorisStreamLoad(
dorisStreamLoad = new DorisStreamLoad(
getBackend(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword());
options.getPassword(),
executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
Expand Down Expand Up @@ -123,7 +133,7 @@ private void addBatch(RowData row) {
if(field != null){
value.add(field.toString());
}else{
value.add(this.NULL_VALUE);
value.add(NULL_VALUE);
}
}
batch.add(value.toString());
Expand Down Expand Up @@ -156,7 +166,7 @@ public synchronized void flush() throws IOException {
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(String.join(lineDelimiter,batch));
dorisStreamLoad.load(String.join(this.lineDelimiter,batch));
batch.clear();
break;
} catch (StreamLoadException e) {
Expand Down Expand Up @@ -202,8 +212,8 @@ public static Builder builder() {
*/
public static class Builder {
private DorisOptions.Builder optionsBuilder;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;

public Builder() {
this.optionsBuilder = DorisOptions.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
Expand Down Expand Up @@ -149,6 +151,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor
"default value is 1s.");


// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";

@Override
public String factoryIdentifier() {
return "doris"; // used for matching to `connector = '...'`
Expand Down Expand Up @@ -234,27 +239,39 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
return builder.build();
}

private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig) {
private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig,Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setStreamLoadProp(streamLoadProp);
return builder.build();
}

private Properties getStreamLoadProp(Map<String, String> tableOptions){
final Properties streamLoadProp = new Properties();

for(Map.Entry<String,String> entry : tableOptions.entrySet()){
if(entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)){
String subKey = entry.getKey().substring((STREAM_LOAD_PROP_PREFIX).length());
streamLoadProp.put(subKey, entry.getValue());
}
}
return streamLoadProp;
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);

Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions())
getDorisExecutionOptions(helper.getOptions(),streamLoadProp)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.doris.flink.table;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.slf4j.Logger;
Expand All @@ -36,6 +37,8 @@
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

/**
Expand All @@ -54,15 +57,17 @@ public class DorisStreamLoad implements Serializable{
private String db;
private String tbl;
private String authEncoding;
private Properties streamLoadProp;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd,Properties streamLoadProp) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.streamLoadProp = streamLoadProp;
}

public String getLoadUrlStr() {
Expand All @@ -89,6 +94,9 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
for(Map.Entry<Object, Object> entry : streamLoadProp.entrySet()){
conn.addRequestProperty(String.valueOf(entry.getKey()),String.valueOf(entry.getValue()));
}
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
Expand Down Expand Up @@ -133,11 +141,15 @@ public void load(String value) throws StreamLoadException {
}

private LoadResponse loadBatch(String value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
String label = streamLoadProp.getProperty("label");
if(StringUtils.isBlank(label)){
Calendar calendar = Calendar.getInstance();
label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));

}

HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
Expand Down