Skip to content

Commit

Permalink
test running all of them and fix md files
Browse files Browse the repository at this point in the history
  • Loading branch information
thimoonxy committed Sep 27, 2021
1 parent 056c550 commit 4d3ec34
Show file tree
Hide file tree
Showing 32 changed files with 198 additions and 2,199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ object Bean2StarRocks {
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("NAME", DataTypes.VARCHAR(20))
.field("SCORE", DataTypes.INT())
.field("name", DataTypes.VARCHAR(20))
.field("score", DataTypes.INT())
.build(),

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object Json2StarRocks {
.withProperty("load-url", "master1:8030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "demo2_flink_tb1")
.withProperty("table-name", "demo2_flink_tb2")
.withProperty("database-name", "starrocks_demo")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
Expand Down
12 changes: 6 additions & 6 deletions FlinkDemo/src/main/scala/com/starrocks/flink/Sql2StarRocks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object Sql2StarRocks {
.uid("sourceStream-uid").name("sourceStream")
.setParallelism(1)

val sourceTable = streamTableEnv.fromDataStream(source,'NAME,'SCORE)
val sourceTable = streamTableEnv.fromDataStream(source,'name,'score)
streamTableEnv.createTemporaryView("sourceTable",sourceTable)

/*
Expand All @@ -63,14 +63,14 @@ object Sql2StarRocks {
streamTableEnv.executeSql(
"""
|CREATE TABLE testTable(
|`NAME` VARCHAR,
|`SCORE` INT
|`name` VARCHAR,
|`score` INT
|) WITH (
|'connector' = 'starrocks',
|'jdbc-url'='jdbc:mysql://master1:9030?starrocksdb_demo',
|'load-url'='master1:8030',
|'database-name' = 'starrocksdb_demo',
|'table-name' = 'demo2_flink_tb1',
|'database-name' = 'starrocks_demo',
|'table-name' = 'demo2_flink_tb3',
|'username' = 'root',
|'password' = '',
|'sink.buffer-flush.max-rows' = '1000000',
Expand All @@ -93,7 +93,7 @@ object Sql2StarRocks {

streamTableEnv.executeSql(
"""
|insert into testTable select `NAME`,`SCORE` from sourceTable
|insert into testTable select `name`,`score` from sourceTable
""".stripMargin)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.flink.types.Row
import scala.util.Random

/**
* 自定义数据源
* customised source
*/
class MySource extends SourceFunction[Row]{

Expand Down
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ json --> flink-connector ---> StarRocks
[flinkConnector_Sql2StarRocks](docs/07_flinkConnector_Sql2StarRocks.md)
```
flinkSql --> flink-connector --> StarRocks
```

## Chinese wiki
-> [Wiki list](docs/cn/README_cn.md)
```

## License

Expand Down
2 changes: 1 addition & 1 deletion SparkDemo/src/main/sh/genData.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ topic=$3

echo "Sending time data to ${topic:=starrocks_t1_src} every ${interval:=15} seconds..."
while true ; do
python demo1_data_gen.py $lines| docker exec -i $id kafka-console-producer --topic "${topic:=starrocks_t1_src}" --broker-list localhost:9092
python ../py/demo1_data_gen.py $lines| docker exec -i $id kafka-console-producer --topic "${topic:=starrocks_t1_src}" --broker-list localhost:9092
sleep "${interval:=15}"
done
22 changes: 11 additions & 11 deletions docs/01_sparkStreaming2StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ topic is empty by far:
Result: Random generation of integers up to 10 every 2s and send to topic spark_demo1_src.

```
#bash ../sh/genData.sh 2 10 spark_demo1_src Usage: ../sh/genData.sh topicName interval
SparkDemo/src/main/sh# bash genData.sh 2 10 spark_demo1_src Usage: ../sh/genData.sh topicName interval
Sending time data to spark_demo1_src every 2 seconds...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
```
Expand Down Expand Up @@ -93,16 +93,16 @@ Connect to StarRocks via Mysql Client to check the result:

```
MySQL [starrocks_demo]> select * from demo1_spark_tb0 limit 5;
+---------------------------+------------+------+--------+------+
| site | date | hour | minute | uv |
+---------------------------+------------+------+--------+------+
| https://docs.starrocks.com/ | 2021-05-29 | 14 | 45 | NULL |
| https://docs.starrocks.com/ | 2021-05-29 | 14 | 48 | NULL |
| https://docs.starrocks.com/ | 2021-05-29 | 15 | 18 | NULL |
| https://docs.starrocks.com/ | 2021-05-29 | 15 | 21 | NULL |
| https://docs.starrocks.com/ | 2021-05-29 | 15 | 29 | NULL |
+---------------------------+------------+------+--------+------+
5 rows in set (0.01 sec)
+-----------------------------+------------+------+--------+------+
| site | date | hour | minute | uv |
+-----------------------------+------------+------+--------+------+
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 40 | NULL |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 43 | NULL |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 58 | NULL |
| https://docs.starrocks.com/ | 2021-09-27 | 10 | 18 | NULL |
| https://docs.starrocks.com/ | 2021-09-27 | 10 | 24 | NULL |
+-----------------------------+------------+------+--------+------+
5 rows in set (0.06 sec)
MySQL [starrocks_demo]> select count(distinct uv) uv from demo1_spark_tb0 ;
+------+
Expand Down
48 changes: 28 additions & 20 deletions docs/02_sparkConnector2StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ Verify the result

```
MySQL [starrocks_demo]> select * from demo1_spark_tb1 limit 5;
+--------------------------+------------+------+--------+--------------+
| site | date | hour | minute | uid_list_str |
+--------------------------+------------+------+--------+--------------+
| https://www.starrocks.com/ | 2021-05-29 | 14 | 47 | 5282 |
| https://www.starrocks.com/ | 2021-05-29 | 14 | 51 | 3157,7582 |
| https://www.starrocks.com/ | 2021-05-29 | 14 | 55 | 2395,8287 |
| https://www.starrocks.com/ | 2021-05-29 | 14 | 58 | 7021 |
| https://www.starrocks.com/ | 2021-05-29 | 14 | 59 | 1041,9393 |
+--------------------------+------------+------+--------+--------------+
+-----------------------------+------------+------+--------+--------------+
| site | date | hour | minute | uid_list_str |
+-----------------------------+------------+------+--------+--------------+
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 40 | 9855,9978 |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 41 | 3503 |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 43 | 9414,9970 |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 45 | 5902 |
| https://docs.starrocks.com/ | 2021-09-27 | 9 | 46 | 5827,6470 |
+-----------------------------+------------+------+--------+--------------+
5 rows in set (0.01 sec)
```

Expand All @@ -87,7 +87,15 @@ PROPERTIES (
## Performing

### add spark-connector jar into the project
![02_spark_idea1](./imgs/02_spark_idea1.png)
```xml
<dependency>
<groupId>com.starrocks.connector</groupId>
<artifactId>spark</artifactId>
<version>1.0.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/starrocks-spark2_2.11-1.0.0.jar</systemPath>
</dependency>
```

### Run the demo

Expand All @@ -101,16 +109,16 @@ Compile and run com.starrocks.spark.SparkConnector2StarRocks

```
MySQL [starrocks_demo]> select * from demo1_spark_tb2 limit 5;
+------+------------+------+--------+---------------------------+
| uid | date | hour | minute | site |
+------+------------+------+--------+---------------------------+
| 10 | 2021-05-29 | 16 | 52 | https://docs.starrocks.com/ |
| 17 | 2021-05-29 | 16 | 38 | https://www.starrocks.com/ |
| 18 | 2021-05-29 | 15 | 30 | https://www.starrocks.com/ |
| 18 | 2021-05-29 | 16 | 58 | https://www.starrocks.com/ |
| 20 | 2021-05-29 | 16 | 34 | https://docs.starrocks.com/ |
+------+------------+------+--------+---------------------------+
5 rows in set (0.02 sec)
+------+------------+------+--------+------------------------------+
| uid | date | hour | minute | site |
+------+------------+------+--------+------------------------------+
| 18 | 2021-09-27 | 10 | 49 | https://www.starrocks.com/ |
| 20 | 2021-09-27 | 10 | 42 | https://trial.starrocks.com/ |
| 65 | 2021-09-27 | 10 | 44 | https://trial.starrocks.com/ |
| 80 | 2021-09-27 | 10 | 41 | https://www.starrocks.com/ |
| 97 | 2021-09-27 | 11 | 27 | https://trial.starrocks.com/ |
+------+------------+------+--------+------------------------------+
5 rows in set (0.15 sec)
```

# License
Expand Down
64 changes: 31 additions & 33 deletions docs/03_sparkLoad2StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ Simulate csv file with 10000 lines, 2 cols and upload to hdfs


```
[root@master1 data]# python data_wide.py 10000 2 > demo3_data1.csv
[root@master1 data]# grep ^[0-9] demo3_data1.csv | wc -l
10000
[root@master1 data]# head demo3_data1.csv
# on laptop
SparkDemo/src/main/py ]# python gen_wide.py 10000 2 > demo3_data1.csv
SparkDemo/src/main/py ]# head demo3_data1.csv
1 10
9 5
8 8
Expand All @@ -67,7 +66,11 @@ Simulate csv file with 10000 lines, 2 cols and upload to hdfs
2 7
3 3
6 5
SparkDemo/src/main/py ]# scp demo3_data1.csv root@master1:~/data/
# on server
[root@master1 ~]# hadoop fs -mkdir -p /starrocks-demo/data
[root@master1 ~]# cd ~/data
[root@master1 data]# hadoop fs -put demo3_data1.csv /starrocks-demo/data/
```
Expand Down Expand Up @@ -95,8 +98,8 @@ Create spark1 resource in starrocks:

```
-- add broker1
MySQL [(none)]> ALTER SYSTEM ADD BROKER broker1 "master1:8000";
Query OK, 0 rows affected (0.04 sec)
ALTER SYSTEM ADD BROKER broker1 "master1:8000";
-- yarn HA cluster mode
CREATE EXTERNAL RESOURCE "spark1"
Expand Down Expand Up @@ -260,23 +263,18 @@ PROPERTIES (
"hive.metastore.uris" = "thrift://master1:9083"
);
MySQL [starrocks_demo]> create table demo3_spark_tb2 like demo3_spark_tb1;
Query OK, 0 rows affected (0.07 sec)
MySQL [starrocks_demo]> CREATE EXTERNAL TABLE hive_t1
-> (
-> k1 string,
-> v1 string
-> )
-> ENGINE=hive
-> properties (
-> "resource" = "hive0",
-> "database" = "default",
-> "table" = "t1");
Query OK, 0 rows affected (0.03 sec)
CREATE TABLE demo3_spark_tb2 like demo3_spark_tb1;
CREATE EXTERNAL TABLE hive_t1
(
k1 string,
v1 string
)
ENGINE=hive
properties (
"resource" = "hive0",
"database" = "default",
"table" = "t1");
```

### Spark load
Expand Down Expand Up @@ -306,23 +304,23 @@ PROPERTIES
show load

```
*************************** 9. row ***************************
JobId: 14039
*************************** 3. row ***************************
JobId: 12023
Label: label2
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000
TaskInfo: cluster:spark1; timeout(s):3600; max_filter_ratio:0.2
ErrorMsg: NULL
CreateTime: 2021-05-31 21:05:45
EtlStartTime: 2021-05-31 21:06:12
EtlFinishTime: 2021-05-31 21:06:46
LoadStartTime: 2021-05-31 21:06:46
LoadFinishTime: 2021-05-31 21:06:49
URL: http://worker1:20888/proxy/application_1622453682723_0025/
JobDetails: {"Unfinished backends":{"00000000-0000-0000-0000-000000000000":[]},"ScannedRows":9999,"TaskNumber":1,"All backends":{"00000000-0000-0000-0000-000000000000":[-1]},"FileNumber":0,"FileSize":0}
9 rows in set (0.00 sec)
CreateTime: 2021-09-27 15:01:10
EtlStartTime: 2021-09-27 15:01:27
EtlFinishTime: 2021-09-27 15:02:02
LoadStartTime: 2021-09-27 15:02:02
LoadFinishTime: 2021-09-27 15:02:03
URL: http://worker1:20888/proxy/application_1632723836409_0002/
JobDetails: {"Unfinished backends":{"00000000-0000-0000-0000-000000000000":[]},"ScannedRows":10000,"TaskNumber":1,"All backends":{"00000000-0000-0000-0000-000000000000":[-1]},"FileNumber":0,"FileSize":0}
3 rows in set (0.00 sec)
```

### Verification
Expand Down
49 changes: 23 additions & 26 deletions docs/05_flinkConnector_Bean2StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
## DDL

```
MySQL [starrocks_demo]> CREATE TABLE `starrocks_demo`.`demo2_flink_tb1` (
-> `name` VARCHAR(100) NOT NULL COMMENT "姓名",
-> `score` INT(2) NOT NULL COMMENT "得分"
-> ) ENGINE=OLAP
-> DUPLICATE KEY(`name`)
-> COMMENT "OLAP"
-> DISTRIBUTED BY HASH(`name`) BUCKETS 3
-> PROPERTIES (
-> "replication_num" = "1",
-> "in_memory" = "false",
-> "storage_format" = "V2"
-> );
Query OK, 0 rows affected (0.11 sec)
USE starrocks_demo;
CREATE TABLE `starrocks_demo`.`demo2_flink_tb1` (
`name` VARCHAR(100) NOT NULL COMMENT "name",
`score` INT(2) NOT NULL COMMENT "score"
) ENGINE=OLAP
DUPLICATE KEY(`name`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`name`) BUCKETS 3
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
```
## Performing
Expand All @@ -31,10 +30,10 @@ Query OK, 0 rows affected (0.11 sec)
~/app/flink-1.11.0/bin/flink run \
-m yarn-cluster \
--yarnname Demo \
-c com.starrocks.flink.Demo1 \
-c com.starrocks.flink.Bean2StarRocks \
-yjm 1048 -ytm 1048 \
-ys 1 -d \
./demo.jar
./StarRocks.jar
```
flink ui
![05_flink_ui_1](imgs/05_flink_ui_1.png)
Expand All @@ -44,16 +43,14 @@ flink ui

```
MySQL [starrocks_demo]> select * from demo2_flink_tb1 limit 5;
+--------+-------+
| name | score |
+--------+-------+
| lebron | 43 |
| lebron | 11 |
| lebron | 42 |
| lebron | 96 |
| kobe | 29 |
+--------+-------+
5 rows in set (0.08 sec)
+---------+-------+
| name | score |
+---------+-------+
| lebron | 37 |
| kobe | 48 |
| stephen | 36 |
+---------+-------+
3 rows in set (0.01 sec)
MySQL [starrocks_demo]> select count(1) from demo2_flink_tb1;
+----------+
Expand Down
Loading

0 comments on commit 4d3ec34

Please sign in to comment.