Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Flink and Spark connector support String type #7075

Merged
merged 23 commits into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4ee37b4
[Bug]:fix when data null , throw NullPointerException
Jul 6, 2021
0d191a6
[Bug]:Distinguish between null and empty string
Jul 6, 2021
16ece83
Merge branch 'apache:master' into master
JNSimba Jul 10, 2021
0e9d01d
Merge branch 'apache:master' into master
JNSimba Jul 15, 2021
ecda935
[Feature]:flink-connector supports streamload parameters
Jul 15, 2021
c312404
[Fix]:code style
Jul 20, 2021
cbd2144
Merge branch 'apache:master' into master
JNSimba Jul 26, 2021
4a5f7ae
Merge branch 'apache:master' into master
JNSimba Aug 5, 2021
e5bd427
Merge branch 'apache:master' into master
JNSimba Aug 11, 2021
e914c5d
Merge branch 'apache:master' into master
JNSimba Aug 12, 2021
4b85175
Merge branch 'apache:master' into master
JNSimba Aug 13, 2021
b3845a1
Merge branch 'apache:master' into master
JNSimba Aug 24, 2021
187f311
Merge branch 'apache:master' into master
JNSimba Sep 18, 2021
b39be53
Merge branch 'apache:master' into master
JNSimba Sep 23, 2021
e0eb0e8
Merge branch 'apache:master' into master
JNSimba Sep 24, 2021
787ceaf
Merge branch 'apache:master' into master
JNSimba Sep 25, 2021
34bb351
Merge branch 'apache:master' into master
JNSimba Sep 27, 2021
4661c76
Merge branch 'apache:master' into master
JNSimba Oct 21, 2021
2d50b57
Merge branch 'apache:master' into master
JNSimba Nov 1, 2021
35181f2
Merge branch 'apache:master' into master
JNSimba Nov 3, 2021
e20f90a
Merge branch 'apache:master' into master
JNSimba Nov 4, 2021
3bd7924
[Feature] flink and spark connector support String
Nov 10, 2021
fd364d3
[Fix] remove unuse thrift
Nov 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Fix]:code style
  • Loading branch information
wudi committed Jul 20, 2021
commit c31240446d81800091d7ddc146af33eb6fde650c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void close() {

/**
* Open a scanner for reading Doris data.
*
* @param openParams thrift struct to required by request
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
Expand Down Expand Up @@ -147,6 +148,7 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedF

/**
* get next row batch from Doris BE
*
* @param nextBatchParams thrift struct to required by request
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
Expand All @@ -161,7 +163,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
result = client.get_next(nextBatchParams);
result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
Expand Down Expand Up @@ -189,6 +191,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor

/**
* close an scanner.
*
* @param closeParams thrift struct to required by request
*/
public void closeScanner(TScanCloseParams closeParams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.Serializable;

/**
* Doris connection options.
* Doris connection options.
*/
public class DorisConnectionOptions implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@
/**
* JDBC sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;

private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;

/** Properties for the StreamLoad. */
/**
* Properties for the StreamLoad.
*/
private final Properties streamLoadProp;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs,Properties streamLoadProp) {
public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
Expand Down Expand Up @@ -94,7 +96,7 @@ public Builder setStreamLoadProp(Properties streamLoadProp) {
}

public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs,streamLoadProp);
return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Options for the Doris connector.
*/
public class DorisOptions extends DorisConnectionOptions{
public class DorisOptions extends DorisConnectionOptions {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Doris read Options
*/
public class DorisReadOptions implements Serializable {
public class DorisReadOptions implements Serializable {

private static final long serialVersionUID = 1L;

Expand All @@ -35,7 +35,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;

public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
Expand Down Expand Up @@ -117,7 +117,7 @@ public static class Builder {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;


Expand Down Expand Up @@ -177,7 +177,7 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
}

public DorisReadOptions build() {
return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync);
return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);

private DorisDeserializationSchema deserializer;
private DorisOptions options;
private DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;

public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
Expand All @@ -55,14 +55,14 @@ public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializatio
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.dorisPartitions = RestService.findPartitions(options,readOptions,logger);
this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
}

@Override
public void run(SourceContext sourceContext) throws Exception{
for(PartitionDefinition partitions : dorisPartitions){
scalaValueReader = new ScalaValueReader(partitions, options,readOptions);
while (scalaValueReader.hasNext()){
public void run(SourceContext sourceContext) throws Exception {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
Object next = scalaValueReader.next();
sourceContext.collect(next);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@


import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.List;


public class SimpleListDeserializationSchema implements DorisDeserializationSchema{
public class SimpleListDeserializationSchema implements DorisDeserializationSchema {

@Override
public TypeInformation getProducedType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ public class DorisException extends Exception {
public DorisException() {
super();
}

public DorisException(String message) {
super(message);
}

public DorisException(String message, Throwable cause) {
super(message, cause);
}

public DorisException(Throwable cause) {
super(cause);
}

protected DorisException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

package org.apache.doris.flink.exception;

public class ShouldNeverHappenException extends DorisException { }
public class ShouldNeverHappenException extends DorisException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}

public StreamLoadException(String message) {
super(message);
}

public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}

public StreamLoadException(Throwable cause) {
super(cause);
}

protected StreamLoadException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int compareTo(PartitionDefinition o) {
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
if (diffSelf.size() == 0) {
if (diffSelf.size() == 0) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);
Expand Down
Loading