File tree Expand file tree Collapse file tree 1 file changed +36
-0
lines changed Expand file tree Collapse file tree 1 file changed +36
-0
lines changed Original file line number Diff line number Diff line change
1
+ # KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics
2
+
3
+ KafkaIO provides unbounded sources and sinks for [ Kafka] ( https://www.firebase.com/ )
4
+ topics. Kafka version 0.9 and above are supported.
5
+
6
+ ## Basic Usage
7
+ * Read from a topic with 8 byte long keys and string values:
8
+ ``` java
9
+ PCollection<KV<Long , String > > kafkaRecords =
10
+ pipeline
11
+ .applY(KafkaIO . read()
12
+ .withBootstrapServers(" broker_1:9092,broker_2:9092" )
13
+ .withTopics(ImmutableList . of(" topic_a" ))
14
+ .withKeyCoder(BigEndianLongCoder . of())
15
+ .withValueCoder(StringUtf8Coder . of())
16
+ .withoutMetadata()
17
+ );
18
+ ```
19
+ * Write the same PCollection to a Kafka topic:
20
+ ```java
21
+ kafkaRecords. apply(KafkaIO . write()
22
+ .withBootstrapServers(" broker_1:9092,broker_2:9092" )
23
+ .withTopic(" results" )
24
+ .withKeyCoder(BigEndianLongCoder . of())
25
+ .withValueCoder(StringUtf8Coder . of())
26
+ ```
27
+
28
+ Please see JavaDoc for KafkaIO in
29
+ [KafkaIO . java](https: // github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100)
30
+ for complete documentation and a more descriptive usage example.
31
+
32
+ ## Release Notes
33
+ * ** 0.2 . 0 ** : Assign one split for each of the Kafka topic partitions. This makes Dataflow
34
+ [Update ](https: // cloud.google.com/dataflow/pipelines/updating-a-pipeline)
35
+ from previous version incompatible.
36
+ * ** 0.1 . 0 ** : KafkaIO with support for Unbounded Source and Sink .
You can’t perform that action at this time.
0 commit comments