Skip to content

Commit

Permalink
[Feature][Flink-SQL-connector] add flink sql connector kafka and docs (
Browse files Browse the repository at this point in the history
…apache#1878)

* [Feature][Flink-SQL-connector] add flink sql connector kafka and docs

* address review comment

Co-authored-by: ruanwenjun <wenjun@apache.org>
  • Loading branch information
legendtkl and ruanwenjun authored May 16, 2022
1 parent 91ddc2e commit 6cade31
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 25 deletions.
74 changes: 74 additions & 0 deletions docs/en/connector/flink-sql/Kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Flink SQL Kafka Connector

## Description

With kafka connector, we can read data from kafka and write data to kafka using Flink SQL. Refer to the [Kafka connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/) for more details.


## Usage
Let us have a brief example to show how to use the connector from end to end.

### 1. kafka prepare
Please refer to the [Kafka QuickStart](https://kafka.apache.org/quickstart) to prepare kafka environment and produce data like following:

```bash
$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092
```

After executing the command, we will come to the interactive mode. Print the following message to send data to kafka.
```bash
{"id":1,"name":"abc"}
>{"id":2,"name":"def"}
>{"id":3,"name":"dfs"}
>{"id":4,"name":"eret"}
>{"id":5,"name":"yui"}
```

### 2. prepare seatunnel configuration
Here is a simple example of seatunnel configuration.
```sql
SET table.dml-sync = true;

CREATE TABLE events (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic'='<topic-name>',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

CREATE TABLE print_table (
id INT,
name STRING
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);

INSERT INTO print_table SELECT * FROM events;
```

### 3. start flink local cluster
```bash
$ ${FLINK_HOME}/bin/start-cluster.sh
```

### 4. start Flink SQL job
Execute the following command in seatunnel home path to start the Flink SQL job.
```bash
$ bin/start-seatunnel-sql.sh -c config/kafka.sql.conf
```

### 5. verify result
After the job submitted, we can see the data printing by connector 'print' in taskmanager's log .
```text
+I[1, abc]
+I[2, def]
+I[3, dfs]
+I[4, eret]
+I[5, yui]
```
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
<guava.version>19.0</guava.version>
<auto-service.version>1.0.1</auto-service.version>
<jmockdata.version>4.3.0</jmockdata.version>
<snappy-java.version>1.1.8.3</snappy-java.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -611,6 +612,24 @@
<artifactId>jmockdata</artifactId>
<version>${jmockdata.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
<artifactId>flink-sql-connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seatunnel-connectors-flink-sql</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-sql-connector-kafka</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

<modules>
<module>flink-sql-connector-jdbc</module>
<module>flink-sql-connector-kafka</module>
</modules>


Expand Down
12 changes: 0 additions & 12 deletions seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Phoenix Core (org.apache.phoenix:phoenix-core:5.0.0-HBase-2.0 - http://www.apache.org/phoenix/phoenix-core/)
(The Apache Software License, Version 2.0) Plexus Interpolation API (org.codehaus.plexus:plexus-interpolation:1.19 - http://plexus.codehaus.org/plexus-components/plexus-interpolation)
(The Apache Software License, Version 2.0) Retrofit (com.squareup.retrofit2:retrofit:2.9.0 - https://github.com/square/retrofit)
(The Apache Software License, Version 2.0) Snappy for Java (org.xerial.snappy:snappy-java:1.0.5 - http://github.com/xerial/snappy-java/)
(The Apache Software License, Version 2.0) SparseBitSet (com.zaxxer:SparseBitSet:1.2 - https://github.com/brettwooldridge/SparseBitSet)
(The Apache Software License, Version 2.0) Spymemcached (net.spy:spymemcached:2.12.3 - http://www.couchbase.org/code/couchbase/java)
(The Apache Software License, Version 2.0) StAX API (stax:stax-api:1.0.1 - http://stax.codehaus.org/)
Expand Down Expand Up @@ -884,10 +883,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) secure-sm (org.elasticsearch:elasticsearch-secure-sm:6.3.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) server (org.elasticsearch:elasticsearch:6.3.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) server (org.elasticsearch:elasticsearch:7.5.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) snappy-java (org.xerial.snappy:snappy-java:1.1.2.6 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) snappy-java (org.xerial.snappy:snappy-java:1.1.4 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) snappy-java (org.xerial.snappy:snappy-java:1.1.7.1 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) snappy-java (org.xerial.snappy:snappy-java:1.1.7.3 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) transport (org.elasticsearch.client:transport:6.3.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) transport (org.elasticsearch.client:transport:7.5.1 - https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) transport-netty4 (org.elasticsearch.plugin:transport-netty4-client:6.3.1 - https://github.com/elastic/elasticsearch)
Expand All @@ -911,14 +906,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(MIT License) Joni (org.jruby.joni:joni:2.1.11 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
(MIT License) Joni (org.jruby.joni:joni:2.1.2 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
(MIT License) Joni (org.jruby.joni:joni:2.1.27 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
(MIT License) SLF4J API Module (org.slf4j:slf4j-api:1.6.4 - http://www.slf4j.org)
(MIT License) SLF4J API Module (org.slf4j:slf4j-api:1.7.15 - http://www.slf4j.org)
(MIT License) SLF4J API Module (org.slf4j:slf4j-api:1.7.16 - http://www.slf4j.org)
(MIT License) SLF4J API Module (org.slf4j:slf4j-api:1.7.25 - http://www.slf4j.org)
(MIT License) SLF4J API Module (org.slf4j:slf4j-api:1.7.30 - http://www.slf4j.org)
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.10 - http://www.slf4j.org)
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.15 - http://www.slf4j.org)
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.16 - http://www.slf4j.org)
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.25 - http://www.slf4j.org)
(MIT License) pyrolite (net.razorvine:pyrolite:4.13 - https://github.com/irmen/Pyrolite)
(MIT License) scopt (com.github.scopt:scopt_2.11:3.5.0 - https://github.com/scopt/scopt)
Expand Down
1 change: 0 additions & 1 deletion seatunnel-e2e/seatunnel-flink-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

Expand Down
1 change: 0 additions & 1 deletion seatunnel-e2e/seatunnel-flink-sql-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>

Expand Down
1 change: 0 additions & 1 deletion seatunnel-e2e/seatunnel-spark-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
1 change: 0 additions & 1 deletion seatunnel-examples/seatunnel-flink-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

Expand Down
1 change: 0 additions & 1 deletion seatunnel-examples/seatunnel-flink-sql-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</project>
1 change: 0 additions & 1 deletion seatunnel-examples/seatunnel-spark-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</project>
7 changes: 0 additions & 7 deletions tools/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -633,18 +633,11 @@ shims-0.9.0.jar
shims-0.9.22.jar
sigar-1.6.5.132.jar
sketches-core-0.9.0.jar
slf4j-api-1.7.15.jar
slf4j-api-1.7.16.jar
slf4j-api-1.7.21.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.15.jar
slf4j-log4j12-1.7.16.jar
slf4j-log4j12-1.7.25.jar
snakeyaml-1.17.jar
snakeyaml-1.24.jar
snappy-0.3.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.1.jar
snappy-java-1.1.8.3.jar
spark-catalyst_2.11-2.4.0.jar
spark-hive-thriftserver_2.11-2.3.4.jar
Expand Down

0 comments on commit 6cade31

Please sign in to comment.