Skip to content
This repository was archived by the owner on Apr 23, 2025. It is now read-only.

Commit 00182b3

Browse files
author
Matt Stump
committed
modify to use kafka
1 parent aa80bfc commit 00182b3

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

build.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
name := "SparkStreamingAggregation"
22

3-
version := "0.1"
3+
version := "0.2"
44

55
scalaVersion := "2.10.4"
66

77
libraryDependencies ++= Seq(
8+
"org.apache.kafka" % "kafka_2.10" % "0.8.0",
89
"org.apache.spark" %% "spark-core" % "1.1.0",
910
"org.apache.spark" %% "spark-streaming" % "1.1.0",
1011
"org.apache.spark" %% "spark-sql" % "1.1.0",
11-
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha2"
12+
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0",
13+
"org.apache.spark" %% "spark-streaming-kafka" % "1.1.0"
1214
)

src/main/scala/Test.scala

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
// 2014-10-07T12:22:10Z;foo;1
88
// 2014-10-07T12:23:11Z;foo;29
99

10+
import java.util.Properties
11+
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
12+
1013
import org.apache.spark._
1114
import org.apache.spark.SparkContext._
1215
import org.apache.spark.streaming._
@@ -21,6 +24,9 @@ import com.datastax.spark.connector.cql.CassandraConnector
2124
import com.datastax.driver.core.ConsistencyLevel
2225
import com.datastax.driver.core.utils.UUIDs
2326

27+
// import org.apache.spark.streaming._
28+
import org.apache.spark.streaming.kafka._
29+
// import org.apache.spark.SparkConf
2430

2531
object Test {
2632

@@ -83,11 +89,30 @@ object Test {
8389

8490
// for testing purposes you can use the alternative input below
8591
// val input = sc.parallelize(sampleRecords)
86-
val input = ssc.socketTextStream("localhost", 9999)
92+
// val input = ssc.socketTextStream("localhost", 9999)
93+
// val Array(zkQuorum, group, topics, numThreads) = args
94+
95+
val zkQuorum = "localhost:2181"
96+
val inputTopic = "events"
97+
98+
val kafkaParams = Map(
99+
"zk.connect" -> "127.0.0.1:2181",
100+
"zookeeper.connect" -> "localhost:2181",
101+
"zookeeper.connection.timeout.ms" -> "1000",
102+
"group.id" -> "spark-streaming-test",
103+
"zookeeper.connection.timeout.ms" -> "1000")
104+
105+
val input = KafkaUtils.createStream(
106+
ssc,
107+
"localhost:2181",
108+
Map(inputTopic -> 1)).map(_._2)
109+
110+
input.print()
111+
87112
val parsedRecords = input.map(parseMessage)
88113
val bucketedRecords = parsedRecords.map(record => ((record.bucket, record.name), record))
89114
val bucketedCounts = bucketedRecords.combineByKey(
90-
(record) => record.count,
115+
(record:Record) => record.count,
91116
(count:Long, record:Record) => (count + record.count),
92117
(c1:Long, c2:Long) => (c1 + c2),
93118
new HashPartitioner(1))
@@ -102,3 +127,36 @@ object Test {
102127
ssc.awaitTermination()
103128
}
104129
}
130+
131+
132+
object KafkaEventProducer {
133+
134+
def main(args: Array[String]) {
135+
136+
if (args.length < 4) {
137+
System.err.println("Usage: KafkaEventProducer <metadataBrokerList> <topic> <messagesPerSec> <wordsPerMessage>")
138+
System.exit(1)
139+
}
140+
141+
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
142+
143+
// Zookeeper connection properties
144+
val props = new Properties()
145+
props.put("metadata.broker.list", brokers)
146+
props.put("serializer.class", "kafka.serializer.StringEncoder")
147+
148+
val config = new ProducerConfig(props)
149+
val producer = new Producer[String, String](config)
150+
151+
// "2014-10-07T12:20:08Z;foo;1"
152+
153+
// Send some messages
154+
while(true) {
155+
val eventCount = scala.util.Random.nextInt(10).toString()
156+
val eventString = "2014-10-07T12:20:08Z;foo;" + eventCount
157+
producer.send(new KeyedMessage[String, String]("events", eventString))
158+
Thread.sleep(100)
159+
}
160+
}
161+
162+
}

0 commit comments

Comments
 (0)