The Pulsar Flink connector implements the function of elastic data processing using Apache Pulsar and Apache Flink.
Chinese document: README_CN
- Java 8 or higher
- Flink 1.9.0 or higher
- Pulsar 2.4.0 or higher
For projects using SBT, Maven, and Gradle, you can use the following parameters to set to your project.
FLINK_VERSIONparameter now has1.9.0and1.11.1options.- Version 1.9.0 supports flink 1.9-1.10
- 1.11.1 version supports 1.11+
- The
SCALA_BINARY_VERSIONparameter is related to the Scala version used by flink. There are2.11and2.12options available. PULSAR_FLINK_VERSIONis the version of this connector.
groupId = io.streamnative.connectors
artifactId = pulsar-flink-{{SCALA_BINARY_VERSION}}-{{FLINK_VERSION}}
version = {{PULSAR_FLINK_VERSION}}
The Jar package is located in Bintray Maven repository of StreamNative.
Maven project can add warehouse configuration to your pom.xml, the content is as follows:
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>The Gradle project can add warehouse configuration in build.gradle, the content is as follows:
repositories {
maven {
url'https://dl.bintray.com/streamnative/maven'
}
}For maven projects, to build an application JAR that contains all the dependencies required by the library and the pulsar flink connector, you can use the following shade plugin definition template:
<plugin>
<!-- Shade all the dependencies to avoid conflicts -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<minimizeJar>false</minimizeJar>
<artifactSet>
<includes>
<include>io.streamnative.connectors:*</include>
<!-- more libs to include here -->
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>For a Gradle project, to build an application JAR containing all the dependencies required by the library and the pulsar flink connector, you can use the following shade plugin definition template:
buildscript {
repositories {
jcenter()
}
dependencies {
classpath'com.github.jengelman.gradle.plugins:shadow:6.0.0'
}
}
apply plugin:'com.github.johnrengelman.shadow'
apply plugin:'java'Like any Flink application, ./bin/flink run is used to compile and start your application.
If you have built a jar containing dependencies using the shade plugin above, you can use --classpath to add your jar to flink run.
The format of the path must be a protocol (for example,
file://), and the path is accessible on all nodes.
Example:
$ ./bin/flink run
-c com.example.entry.point.ClassName file://path/to/jars/your_fat_jar.jar
...
Try bin/start-scala-shell.sh in the interactive Scala shell, you can use the --addclasspath parameter to directly add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}. jar.
Example:
$ ./bin/start-scala-shell.sh remote <hostname> <portnumber>
--addclasspath pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
For more information about submitting applications using CLI, please refer to Command-Line Interface.
To use SQL Client Beta and write SQL queries to manipulate the data in Pulsar, you can Use the --addclasspath parameter to directly add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar.
Example:
$ ./bin/sql-client.sh embedded --jar pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
By default, to use the Pulsar directory in SQL Client and automatically register it at startup, SQL Client will read its configuration from the environment file ./conf/sql-client-defaults.yaml. You need to add the Pulsar catalog in the catalogs section of this YAML file:
catalogs:
-name: pulsarcatalog
type: pulsar
default-database: tn/ns
service-url: "pulsar://localhost:6650"
admin-url: "http://localhost:8080"Flink's Pulsar consumer is called FlinkPulsarSource<T> or FlinkPulsarRowSource which only has the function of automatically inferring data patterns. It provides access to one or more Pulsar topics.
The construction method has the following parameters:
-
Connect the service address
serviceUrlused by the Pulsar instance and the management addressadminUrl. -
When using
FlinkPulsarSource, you need to setDeserializationSchema<T>orPulsarDeserializationSchema<T>. -
The Properties parameter is used to configure the behavior of the Pulsar Consumer. The required parameters for Properties are as follows:
-Among these parameters,
topic,topics, ortopicsPatternmust have an existing value, and there can only be one. Used to configure the topic information for consumption. (topicsis multiple topics separated by commas,,topicsPatternis a java regular expression that can match several topics)The consumption mode can be set by FlinkPulsarSource's setStartFromLatest, setStartFromEarliest, setStartFromSpecificOffsets, setStartFromSubscription, etc.
example:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic");
props.setProperty("partitiondiscoveryintervalmillis", "5000");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);
// or setStartFromLatest, setStartFromSpecificOffsets, setStartFromSubscription
source.setStartFromEarliest();
DataStream<String> stream = see.addSource(source);
// chain operations on dataStream of String and sink the output
// end method chaining
see.execute();In many cases, the timestamp of the record (explicitly or implicitly) is embedded in the record itself. Also, users may want to issue watermarks periodically or irregularly. Based on a special record in the Pulsar stream containing the current event time watermark. For these cases, the Flink Pulsar source allows specifying AssignerWithPeriodicWatermarks or AssignerWithPunctuatedWatermarks.
Internally, each Pulsar partition executes an instance of the allocator. After specifying such an allocator, for each record read from Pulsar, call extractTimestamp (T element, long previousElementTimestamp) to assign a timestamp to the record, and then Watermark getCurrentWatermark() (for periodic) or call watermark checkAndGetNextWatermark(T lastElement, long extractTimestamp) (for punctuation) determines whether a new watermark should be issued and which timestamp to use.
Use FlinkPulsarSink instance to handle the POJO type or FlinkPulsarRowSink to handle Flink Row type.
It allows the record stream to be written to one or more Pulsar topics.
Example:
FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
serviceUrl,
adminUrl,
Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record
props,
TopicKeyExtractor.NULL, // replace this to extract key or topic for each record
Person.class,
RecordSchemaType.AVRO);
stream.addSink(sink);If the topic information is included in the record, it can be implemented by customizing TopicKeyExtractor to distribute the messages to different queues.
After enabling Flink checkpoints, FlinkPulsarSink and FlinkPulsarRowSink can provide an at-least-once delivery guarantee.
In addition to enabling Flink checkpointing, you should also configure setLogFailuresOnly(boolean) and setFlushOnCheckpoint(boolean).
setFlushOnCheckpoint(boolean): By default, it is set to true. After enabling this function, writing pulsar records will be executed at this checkpoint snapshotState. This ensures that all records before checkpoint are written to the pulsar. Note that the at-least-once setting of flink must be turned on at the same time.
The Pulsar-flink connector supports Flink's Table comprehensively, covering the following list:
- Connector
- Catalog
- SQL, DDL (DDL is supported in flink 1.11)
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(see);
String topic = "";
String tableName = "pulsarTable";
TableSchema tSchema = TableSchema.builder().field("value", DataTypes.BOOLEAN()).build();
tEnv.connect(
new Pulsar()
.urls(getServiceUrl(), getAdminUrl())
.topic(topic)
.startFromEarliest()
.property(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000")
)
.withSchema(new Schema().schema(tSchema))
.inAppendMode()
.createTemporaryTable(tableName);
Table t = tEnv.sqlQuery("select `value` from "+ tableName);Flink always searches tables, views, and UDFs in the current directory and database. To use Pulsar Catalog and treat topics in Pulsar as tables in Flink, you should use the pulsarcatalog defined in ./conf/sql-client-defaults.yaml.
tableEnv.useCatalog("pulsarcatalog")
tableEnv.useDatabase("public/default")
tableEnv.scan("topic0")Flink SQL> USE CATALOG pulsarcatalog;
Flink SQL> USE `public/default`;
Flink SQL> select * from topic0;The following configuration is optional in the environment file or can be overridden in the SQL client session using the SET command.
Note: Because the delete operation is dangerous, deleting tenant/namespace and Flink in the catalog is not supported.
set global.disable.operator.chain = true;
create table test_flink_sql(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`rchannel` VARCHAR,
`be_time` bigint,
`be_time` VARCHAR,
`activity_id` VARCHAR,
`country_code` VARCHAR,
`os` VARCHAR,
`recv_time` bigint,
`remark` VARCHAR,
`client_ip` VARCHAR,
`day` as TO_DATE(rtime),
`hour` as date_format(rtime,'HH')
) with (
'connector.type' ='pulsar',
'connector.version' = '1',
'connector.topic' ='persistent://test/test-gray/test_flink_sql',
'connector.service-url' ='pulsar://xxx',
'connector.admin-url' ='http://xxx',
'connector.startup-mode' ='external-subscription',
'connector.sub-name' ='test_flink_sql_v1',
'connector.properties.0.key' ='pulsar.reader.readerName',
'connector.properties.0.value' ='test_flink_sql_v1',
'connector.properties.1.key' ='pulsar.reader.subscriptionRolePrefix',
'connector.properties.1.value' ='test_flink_sql_v1',
'connector.properties.2.key' ='pulsar.reader.receiverQueueSize',
'connector.properties.2.value' = '1000',
'connector.properties.3.key' ='partitiondiscoveryintervalmillis',
'connector.properties.3.value' = '5000',
'format.type' ='json',
'format.derive-schema' ='true',
'format.ignore-parse-errors' ='true',
'update-mode' ='append'
);
insert into hive.test.test_flink_sql
select
rip, rtime,
if (uid is null, 0, uid) as uid,
if (activity_id is null,'', activity_id) as activity_id,
if (country_code is null,'', country_code) as country_code,
if (os is null,'', os) as os,
if (recv_time is null, 0, recv_time) as recv_time,
if (remark is null,'', remark) as remark,
if (client_ip is null,'', client_ip) as client_ip,
cast(`day` as string) as `day`,
cast(`hour` as string) as `hour`
from test_flink_sql;DDL is supported in the flink 1.11 package, and more detailed parameters can be set in the creation table.
DeserializationSchema is used to decode Source records. The core method can only decode pulsar Message#value. In a custom scenario, users need to get more information from the Message, which cannot be satisfied.
The pulsar-flink connector does not directly use DeserializationSchema, but defines PulsarDeserializationSchema<T>. Through the PulsarDeserializationSchema<T> instance, it leaves more room for users to expand.
Use new PulsarDeserializationSchemaWrapper<>(deserializationSchema) to support instances of DeserializationSchema.
The pulsar-flink connector provides two DeserializationSchema decoders:
-
JsonDeser: When using JSONSchema for pulsar topic, you can create aDeserializationSchemainstance throughJsonDeser.of(POJO_CLASS_NAME.class). -
AvroDeser: When using AVROSchema for pulsar topic, you can useAvroDeser.of(POJO_CLASS_NAME.class)forDeserializationSchemainstance.
After Flink 1.9 is upgraded to 1.11, there have been major changes, and many APIs are incompatible. In the two versions, the processing of the schema is different.
In flink 1.9, when creating a table, configuring the schema parameter is optional. But when upgrading to flink1.11, the schema must be specified and must be consistent with the TableSource return type.
This affects PulsarDeserializer's decoding of flink row types, and has two differences:
-
Expand the difference of field types:
| Option | Value | Default | Description |
|---|---|---|---|
| `default-database` | The default database name. | public/default | When using Pulsar catalog, topics in Pulsar are regarded as tables in Flink, therefore, `database` is another name for `tenant/namespace`. The database is the basic path for table lookup or creation. |
| Column | Type in flink 1.9 | Type in flink 1.11 |
|---|---|---|
__key |
Bytes | Bytes |
__topic |
String | String |
__messageId |
Bytes | Bytes |
__publishTime |
Timestamp | LocalDateTime |
__eventTime |
Timestamp | LocalDateTime |
Extension field configuration:
-flink 1.9 extended fields will be added by default
-Flink 1.11 does not use extended fields by default. It is enabled when use-extend-field=true is configured, and the extended fields need to be declared on the schema. It is turned on by default in catalog mode.
| Parameters | Default Value | Description | Effective Range |
|---|---|---|---|
| topic | null | pulsar topic | source |
| topics | null | Multiple pulsar topics connected by half-width commas | source |
| topicspattern | null | Multiple pulsar topics with more java regular matching | source |
| partitiondiscoveryintervalmillis | -1 | Automatically discover increase and decrease topics, milliseconds. -1 means not open. | source |
| clientcachesize | 5 | Number of cached pulsar clients | source, sink |
| auth-params | null | pulsar clients auth | source, sink |
| auth-plugin-classname | null | pulsar clients auth | source, sink |
| flushoncheckpoint | true | Write a message to pulsar | sink |
| failonwrite | false | When sink error occurs, continue to confirm the message | sink |
| polltimeoutms | 120000 | The timeout period for waiting to get the next message, milliseconds | source |
| failondataloss | true | Does it fail when data is lost | source |
| commitmaxretries | 3 | Maximum number of retries when offset to pulsar message | source |
| use-extend-field | false | Use PulsarDeserializer to decode messages, whether to add extended fields Only effective in flink 1.11, flink1.9 directly adds extended fields. |
source |
| startup-mode | null | earliest, latest, the position where subscribers consume news, required | catalog |
| table-default-partitions | 5 | Specify the number of partitions to create a topic | catalog |
| pulsar.reader.* | For detailed configuration of pulsar consumer, please refer to pulsar reader | source | |
| pulsar.reader.subscriptionRolePrefix | flink-pulsar- | When no subscriber is specified, the prefix of the subscriber name is automatically created | source |
| pulsar.reader.receiverQueueSize | 1000 | Receive queue size | source |
| pulsar.producer.* | For detailed configuration of pulsar consumer, please refer to pulsar producer | Sink | |
| pulsar.producer.sendTimeoutMs | 30000 | Timeout time when sending a message, milliseconds | Sink |
| pulsar.producer.blockIfQueueFull | false | Producer writes messages. When the queue is full, block the method instead of throwing an exception | Sink |
pulsar.reader.* and pulsar.producer.* specify a more detailed configuration of pulsar behavior, * replace with the configuration name in pulsar, and the content refers to the link in the table.
In the DDL statement, the format of the above parameters is used and adjusted,
Configure the setting of pulsar.reader.readerName=test_flink_sql_v1
'connector.properties.0.key' ='pulsar.reader.readerName', //parameter name
'connector.properties.0.value' ='test_flink_sql_v1', // parameter value
Example:
create table test_flink_sql(
`data` VARCHAR
) with (
'connector.type' = 'pulsar',
'connector.version' = '1',
'connector.topic' = 'persistent://test/test-gray/test_flink_sql',
'connector.service-url' = 'pulsar://xxx',
'connector.admin-url' = 'http://xxx',
'connector.startup-mode' = 'earliest', //订阅模式
'connector.properties.0.key' = 'pulsar.reader.readerName', //参数名
'connector.properties.0.value' = 'test_flink_sql_v1', // 参数值
'connector.properties.1.key' = 'pulsar.reader.subscriptionRolePrefix',
'connector.properties.1.value' = 'test_flink_sql_v1',
'connector.properties.2.key' = 'pulsar.reader.receiverQueueSize',
'connector.properties.2.value' = '1000',
'connector.properties.3.key' = 'partitiondiscoveryintervalmillis', //参数名
'connector.properties.3.value' = '5000', //参数值
'update-mode' = 'append'
);For Pulsar instance configured with Authentication, Pulsar Flink Connector could be set similarly with the regular Pulsar Client.
For FlinkPulsarSource, FlinkPulsarRowSource, FlinkPulsarSink, and FlinkPulsarRowSink, they all come with a constructor that enables you to
pass in ClientConfigurationData as one of the parameters. You should construct a ClientConfigurationData first and pass it to the correspond constructor.
For example:
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(serviceUrl);
conf.setAuthPluginClassName(className);
conf.setAuthParams(params);
Properties props = new Properties();
props.setProperty("topic", "test-source-topic");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(adminUrl, conf, new SimpleStringSchema(), props);If you want to build a Pulsar Flink connector reading data from Pulsar and writing results to Pulsar, follow the steps below.
-
Check out the source code.
$ git clone https://github.com/streamnative/pulsar-flink.git $ cd pulsar-flink -
Install Docker.
Pulsar-flink connector is using Testcontainers for integration tests. To run the integration tests, make sure you have installed Docker.
-
Set a Java version.
Change
java.versionandjava.binary.versioninpom.xml.Java version should be consistent with the Java version of flink you use.
-
Build the project.
$ mvn clean install -DskipTests
-
Run the tests.
$ mvn clean install
Once the installation is finished, there is a fat jar generated under both local maven repo and target directory.