Skip to content

Commit 83d0402

Browse files
committed
Added JavaDirectKafkaWordCount example.
1 parent 26df23c commit 83d0402

File tree

2 files changed

+114
-1
lines changed

2 files changed

+114
-1
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.util.HashMap;
21+
import java.util.HashSet;
22+
import java.util.Arrays;
23+
import java.util.regex.Pattern;
24+
25+
import scala.Tuple2;
26+
27+
import com.google.common.collect.Lists;
28+
import kafka.serializer.StringDecoder;
29+
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.api.java.function.*;
32+
import org.apache.spark.streaming.api.java.*;
33+
import org.apache.spark.streaming.kafka.KafkaUtils;
34+
import org.apache.spark.streaming.Durations;
35+
36+
/**
37+
* Consumes messages from one or more topics in Kafka and does wordcount.
38+
* Usage: DirectKafkaWordCount <brokers> <topics>
39+
* <brokers> is a list of one or more Kafka brokers
40+
* <topics> is a list of one or more kafka topics to consume from
41+
*
42+
* Example:
43+
* $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
44+
*/
45+
46+
public final class JavaDirectKafkaWordCount {
47+
private static final Pattern SPACE = Pattern.compile(" ");
48+
49+
public static void main(String[] args) {
50+
if (args.length < 2) {
51+
System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
52+
" <brokers> is a list of one or more Kafka brokers\n" +
53+
" <topics> is a list of one or more kafka topics to consume from\n\n");
54+
System.exit(1);
55+
}
56+
57+
StreamingExamples.setStreamingLogLevels();
58+
59+
String brokers = args[0];
60+
String topics = args[1];
61+
62+
// Create context with 2 second batch interval
63+
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
64+
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
65+
66+
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
67+
HashMap<String, String> kafkaParams = new HashMap<String, String>();
68+
kafkaParams.put("metadata.broker.list", brokers);
69+
70+
// Create direct kafka stream with brokers and topics
71+
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
72+
jssc,
73+
String.class,
74+
String.class,
75+
StringDecoder.class,
76+
StringDecoder.class,
77+
kafkaParams,
78+
topicsSet
79+
);
80+
81+
// Get the lines, split them into words, count the words and print
82+
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
83+
@Override
84+
public String call(Tuple2<String, String> tuple2) {
85+
return tuple2._2();
86+
}
87+
});
88+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
89+
@Override
90+
public Iterable<String> call(String x) {
91+
return Lists.newArrayList(SPACE.split(x));
92+
}
93+
});
94+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
95+
new PairFunction<String, String, Integer>() {
96+
@Override
97+
public Tuple2<String, Integer> call(String s) {
98+
return new Tuple2<String, Integer>(s, 1);
99+
}
100+
}).reduceByKey(
101+
new Function2<Integer, Integer, Integer>() {
102+
@Override
103+
public Integer call(Integer i1, Integer i2) {
104+
return i1 + i2;
105+
}
106+
});
107+
wordCounts.print();
108+
109+
// Start the computation
110+
jssc.start();
111+
jssc.awaitTermination();
112+
}
113+
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private HashSet<String> topicToSet(String topic) {
146146

147147
private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
148148
HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
149-
topicMap.put(new TopicAndPartition(topic, scala.Int.box(0)), offsetToStart);
149+
topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
150150
return topicMap;
151151
}
152152

0 commit comments

Comments
 (0)