Skip to content

Commit

Permalink
[Feature] Spark connector supports to specify fields to write (apache…
Browse files Browse the repository at this point in the history
…#6973)

1. By default , Spark connector must write all fields value to `Doris` table .
In this feature , user can specify part of fields to write ,  even specify the order of the fields to write.

eg:
I have a table named `student` which has three columns (name,gender,age) ,
creating table sql as following:
```sql
create table student (name varchar(255), gender varchar(10), age int) duplicate key (name) distributed by hash(name) buckets 2;
```
Now , I just want  to write values to two columns : name , gender.
The code as following:
```scala
    val df = spark.createDataFrame(Seq(
      ("m", "zhangsan"),
      ("f", "lisi"),
      ("m", "wangwu")
    ))
    df.write
      .format("doris")
      .option("doris.fenodes", dorisFeNodes)
      .option("doris.table.identifier", dorisTable)
      .option("user", dorisUser)
      .option("password", dorisPwd)
      //specify your fields or the order
      .option("doris.write.field", "gender,name")
      .save()
```
  • Loading branch information
chovy-3012 authored Nov 2, 2021
1 parent aba7d2c commit f39a5bc
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 231 deletions.
7 changes: 7 additions & 0 deletions docs/en/extending-doris/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ mockDataDF.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()

## stream sink(StructuredStreaming)
Expand All @@ -154,6 +157,9 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()
```
Expand All @@ -175,6 +181,7 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| doris.exec.mem.limit | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. |
| doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration |
| doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true |
| doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. |

### SQL & Dataframe Configuration

Expand Down
7 changes: 7 additions & 0 deletions docs/zh-CN/extending-doris/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ mockDataDF.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()

## stream sink(StructuredStreaming)
Expand All @@ -156,6 +159,9 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()
```
Expand All @@ -179,6 +185,7 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
| doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
| doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
| doris.write.fields | -- | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照Doris表字段顺序写入全部字段。 |

### SQL 和 Dataframe 专有配置

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DorisStreamLoad implements Serializable{
private String db;
private String tbl;
private String authEncoding;
private String columns;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
Expand All @@ -83,6 +84,7 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
}

public String getLoadUrlStr() {
Expand All @@ -108,6 +110,9 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
if (columns != null && !columns.equals("")) {
conn.addRequestProperty("columns", columns);
}
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ public interface ConfigurationOptions {

String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String DORIS_WRITE_FIELDS = "doris.write.fields";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,19 @@
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
import java.util.Base64;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.stream.Collectors;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
Expand All @@ -60,17 +52,22 @@
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.rest.models.*;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
Expand All @@ -86,8 +83,7 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
private static final String BACKENDS = "/rest/v1/system?path=//backends";

private static final String BACKENDS = "/api/show_proc?path=//backends";

/**
* send request to Doris FE and get response json string.
Expand All @@ -114,37 +110,36 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
.build();

request.setConfig(requestConfig);

String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(user, password));
HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);

IOException ex = null;
int statusCode = -1;

for (int attempt = 0; attempt < retries; attempt++) {
CloseableHttpClient httpClient = HttpClients.createDefault();
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), user, password,logger);
} else {
response = getConnectionPost(request,user, password,logger);
}
if (response == null) {
CloseableHttpResponse response = httpClient.execute(request, context);
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
logger.trace("Success get response from Doris FE: {}, response is: {}.",
request.getURI(), response);
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
//Handle the problem of inconsistent data format returned by http v1 and v2
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
return response;
}
request.getURI(), res);
return res;
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
Expand All @@ -155,54 +150,6 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}

private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);

connection.connect();
return parseResponse(connection,logger);
}

private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
throw new IOException("Failed to get response from Doris");
}
String result = "";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
if (in != null) {
in.close();
}
return result;
}

private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
InputStream content = ((HttpPost)request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
PrintWriter out = new PrintWriter(conn.getOutputStream());
// send request params
out.print(res);
// flush
out.flush();
// read response
return parseResponse(conn,logger);
}
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
Expand Down Expand Up @@ -502,9 +449,9 @@ public static String randomBackend(SparkSettings sparkSettings , Logger logger)

static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
Backend backend;
List<List<String>> backend;
try {
backend = mapper.readValue(response, Backend.class);
backend = mapper.readValue(response, List.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
Expand All @@ -523,7 +470,13 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
List<BackendRow> backendRows = backend.stream().map(array -> {
BackendRow backendRow = new BackendRow();
backendRow.setIP(array.get(2));
backendRow.setHttpPort(array.get(6));
backendRow.setAlive(Boolean.parseBoolean(array.get(10)));
return backendRow;
}).filter(v -> v.getAlive()).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
Expand All @@ -541,7 +494,7 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit f39a5bc

Please sign in to comment.