Skip to content

Commit

Permalink
[Feature] support spark connector sink stream data to doris (apache#6761
Browse files Browse the repository at this point in the history
)

* [Feature] support spark connector sink stream data to doris

* [Doc] Add spark-connector batch/stream writing instructions

* add license and remove meaningless blanks code

Co-authored-by: wei.zhao <wei.zhao@aispeech.com>
  • Loading branch information
chovy-3012 and wei.zhao authored Sep 28, 2021
1 parent df5ba6b commit 8d47100
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 61 deletions.
48 changes: 43 additions & 5 deletions docs/en/extending-doris/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ under the License.

# Spark Doris Connector

Spark Doris Connector can support reading data stored in Doris through Spark.
Spark Doris Connector can support reading data stored in Doris and writing data to Doris through Spark.

- The current version only supports reading data from `Doris`.
- Support reading data from `Doris`.
- Support `Spark DataFrame` batch/stream writing data to `Doris`
- You can map the `Doris` table to` DataFrame` or `RDD`, it is recommended to use` DataFrame`.
- Support the completion of data filtering on the `Doris` side to reduce the amount of data transmission.

Expand Down Expand Up @@ -57,8 +58,9 @@ sh build.sh 2 ## soark 2.x version, the default is 2.3.4
After successful compilation, the file `doris-spark-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode, put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package.

## Example
### Read

### SQL
#### SQL

```sql
CREATE TEMPORARY VIEW spark_doris
Expand All @@ -73,7 +75,7 @@ OPTIONS(
SELECT * FROM spark_doris;
```

### DataFrame
#### DataFrame

```scala
val dorisSparkDF = spark.read.format("doris")
Expand All @@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
dorisSparkDF.show(5)
```

### RDD
#### RDD

```scala
import org.apache.doris.spark._
Expand All @@ -101,6 +103,42 @@ val dorisSparkRDD = sc.dorisRDD(

dorisSparkRDD.collect()
```
### Write
#### DataFrame(batch/stream)
```scala
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
```

## Configuration

Expand Down
53 changes: 48 additions & 5 deletions docs/zh-CN/extending-doris/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ under the License.

# Spark Doris Connector

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据。
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris

- 当前版本只支持从`Doris`中读取数据。
- 支持从`Doris`中读取数据
- 支持`Spark DataFrame`批量/流式 写入`Doris`
- 可以将`Doris`表映射为`DataFrame`或者`RDD`,推荐使用`DataFrame`
- 支持在`Doris`端完成数据过滤,减少数据传输量。

Expand Down Expand Up @@ -57,8 +58,9 @@ sh build.sh 2 ## soark 2.x版本,默认是2.3.4
编译成功后,会在 `output/` 目录下生成文件 `doris-spark-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Spark``ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。

## 使用示例
### 读取

### SQL
#### SQL

```sql
CREATE TEMPORARY VIEW spark_doris
Expand All @@ -73,7 +75,7 @@ OPTIONS(
SELECT * FROM spark_doris;
```

### DataFrame
#### DataFrame

```scala
val dorisSparkDF = spark.read.format("doris")
Expand All @@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
dorisSparkDF.show(5)
```

### RDD
#### RDD

```scala
import org.apache.doris.spark._
Expand All @@ -102,6 +104,47 @@ val dorisSparkRDD = sc.dorisRDD(
dorisSparkRDD.collect()
```

### 写入

#### DataFrame(batch/stream)

```scala
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
```



## 配置

### 通用配置项
Expand Down
6 changes: 6 additions & 0 deletions extension/spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* a cached streamload client for each partition
*/
public class CachedDorisStreamLoadClient {
private static final long cacheExpireTimeout = 30 * 60;
private static LoadingCache<SparkSettings, DorisStreamLoad> dorisStreamLoadLoadingCache;

static {
dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Object, Object>() {
@Override
public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
//do nothing
}
})
.build(
new CacheLoader<SparkSettings, DorisStreamLoad>() {
@Override
public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
return dorisStreamLoad;
}
}
);
}

public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
return dorisStreamLoad;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package org.apache.doris.spark;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,12 +40,16 @@
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable{
public static final String FIELD_DELIMITER = "\t";
public static final String LINE_DELIMITER = "\n";
public static final String NULL_VALUE = "\\N";

private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);

Expand All @@ -65,6 +73,18 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}

public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
String hostPort = RestService.randomBackend(settings, LOG);
this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}

public String getLoadUrlStr() {
return loadUrlStr;
}
Expand All @@ -84,7 +104,6 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
Expand Down Expand Up @@ -114,6 +133,22 @@ public String toString() {
}
}

public void load(List<List<Object>> rows) throws StreamLoadException {
StringJoiner lines = new StringJoiner(LINE_DELIMITER);
for (List<Object> row : rows) {
StringJoiner line = new StringJoiner(FIELD_DELIMITER);
for (Object field : row) {
if (field == null) {
line.add(NULL_VALUE);
} else {
line.add(field.toString());
}
}
lines.add(line.toString());
}
load(lines.toString());
}

public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,17 @@ public String save() throws IllegalArgumentException {
Properties copy = asProperties();
return IOUtils.propsToString(copy);
}

@Override
public int hashCode() {
return asProperties().hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
return asProperties().equals(((Settings) obj).asProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import com.google.common.base.Preconditions;

import scala.Option;
import scala.Serializable;
import scala.Tuple2;

public class SparkSettings extends Settings {
public class SparkSettings extends Settings implements Serializable {

private final SparkConf cfg;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.sql.DorisWriterOption;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
Expand Down Expand Up @@ -476,17 +475,13 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {

/**
* choice a Doris BE node to request.
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException {
// set user auth
sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user());
sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password());
String feNodes = options.feHostPort();
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS,feNode);
HttpGet httpGet = new HttpGet(beUrl);
Expand Down
Loading

0 comments on commit 8d47100

Please sign in to comment.