Skip to content

Commit 77c9a66

Browse files
committed
fix build
1 parent f1a6696 commit 77c9a66

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,16 @@ object StatefulNetworkWordCount {
5959
val wordDstream = words.map(x => (x, 1))
6060

6161
// Update the cumulative count using updateStateByKey
62-
// This will give a Dstream made of state (which is the cumulative count of the words)
63-
64-
val trackStateFunc = (word: String, one: Option[Int], state: State[Int]) => {
65-
val sum = one.getOrElse(0) + state.getOrElse(0)
62+
// This will give a DStream made of state (which is the cumulative count of the words)
63+
val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
64+
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
6665
val output = (word, sum)
6766
state.update(sum)
6867
Some(output)
6968
}
7069

7170
val stateDstream = wordDstream.trackStateByKey(
72-
StateSpec(trackStateFunc).initialState(initialRDD))
71+
StateSpec.function(trackStateFunc).initialState(initialRDD))
7372
stateDstream.print()
7473
ssc.start()
7574
ssc.awaitTermination()

0 commit comments

Comments
 (0)