@@ -4,26 +4,28 @@ KafkaIO provides unbounded sources and sinks for [Kafka](https://www.firebase.co
4
4
topics. Kafka version 0.9 and above are supported.
5
5
6
6
## Basic Usage
7
- * Read from a topic with 8 byte long keys and string values:
8
- ``` java
9
- PCollection<KV<Long , String > > kafkaRecords =
10
- pipeline
11
- .applY(KafkaIO . read()
12
- .withBootstrapServers(" broker_1:9092,broker_2:9092" )
13
- .withTopics(ImmutableList . of(" topic_a" ))
14
- .withKeyCoder(BigEndianLongCoder . of())
15
- .withValueCoder(StringUtf8Coder . of())
16
- .withoutMetadata()
17
- );
18
- ```
19
- * Write the same PCollection to a Kafka topic:
20
- ```java
21
- kafkaRecords. apply(KafkaIO . write()
7
+
8
+ * Read from a topic with 8 byte long keys and string values:
9
+ ``` java
10
+ PCollection<KV<Long , String > > kafkaRecords =
11
+ pipeline
12
+ .applY(KafkaIO . read()
22
13
.withBootstrapServers(" broker_1:9092,broker_2:9092" )
23
- .withTopic( " results " )
14
+ .withTopics( ImmutableList . of( " topic_a " ) )
24
15
.withKeyCoder(BigEndianLongCoder . of())
25
16
.withValueCoder(StringUtf8Coder . of())
26
- ```
17
+ .withoutMetadata()
18
+ );
19
+ ```
20
+
21
+ * Write the same PCollection to a Kafka topic:
22
+ ``` java
23
+ kafkaRecords. apply(KafkaIO . write()
24
+ .withBootstrapServers(" broker_1:9092,broker_2:9092" )
25
+ .withTopic(" results" )
26
+ .withKeyCoder(BigEndianLongCoder . of())
27
+ .withValueCoder(StringUtf8Coder . of())
28
+ ```
27
29
28
30
Please see JavaDoc for KafkaIO in
29
31
[KafkaIO . java](https: // github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100)
0 commit comments