File tree Expand file tree Collapse file tree 2 files changed +40
-1
lines changed Expand file tree Collapse file tree 2 files changed +40
-1
lines changed Original file line number Diff line number Diff line change
1
+ # KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics
2
+
3
+ KafkaIO provides unbounded source and sink for [ Kafka] ( http://kafka.apache.org/ )
4
+ topics. Kafka versions 0.9 and above are supported.
5
+
6
+ ## Basic Usage
7
+
8
+ * Read from a topic with 8 byte long keys and string values:
9
+ ``` java
10
+ PCollection<KV<Long , String > > kafkaRecords =
11
+ pipeline
12
+ .apply(KafkaIO . read()
13
+ .withBootstrapServers(" broker_1:9092,broker_2:9092" )
14
+ .withTopics(ImmutableList . of(" topic_a" ))
15
+ .withKeyCoder(BigEndianLongCoder . of())
16
+ .withValueCoder(StringUtf8Coder . of())
17
+ .withoutMetadata()
18
+ );
19
+ ```
20
+
21
+ * Write the same PCollection to a Kafka topic:
22
+ ``` java
23
+ kafkaRecords. apply(KafkaIO . write()
24
+ .withBootstrapServers(" broker_1:9092,broker_2:9092" )
25
+ .withTopic(" results" )
26
+ .withKeyCoder(BigEndianLongCoder . of())
27
+ .withValueCoder(StringUtf8Coder . of())
28
+ );
29
+ ```
30
+
31
+ Please see JavaDoc for KafkaIO in
32
+ [ KafkaIO.java] ( https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100 )
33
+ for complete documentation and a more descriptive usage example.
34
+
35
+ ## Release Notes
36
+ * ** 0.2.0** : Assign one split for each of the Kafka topic partitions. This makes Dataflow
37
+ [ Update] ( https://cloud.google.com/dataflow/pipelines/updating-a-pipeline )
38
+ from previous version incompatible.
39
+ * ** 0.1.0** : KafkaIO with support for Unbounded Source and Sink.
Original file line number Diff line number Diff line change 25
25
<artifactId >google-cloud-dataflow-java-contrib-kafka</artifactId >
26
26
<name >Google Cloud Dataflow Kafka IO Library</name >
27
27
<description >Library to read Kafka topics.</description >
28
- <version >0.1 .0-SNAPSHOT</version >
28
+ <version >0.2 .0-SNAPSHOT</version >
29
29
30
30
<properties >
31
31
<dataflow .version>[1.6.0, 1.99)</dataflow .version>
You can’t perform that action at this time.
0 commit comments