Skip to content

Commit 7aa592c

Browse files
committed
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles
Solves two JIRAs in one shot - Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints - Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #3457 from tdas/savefiles-fix and squashes the following commits: bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles. (cherry picked from commit 8838ad7) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 6371737 commit 7aa592c

File tree

2 files changed

+70
-16
lines changed

2 files changed

+70
-16
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.streaming.StreamingContext._
21-
22-
import org.apache.spark.{Partitioner, HashPartitioner}
23-
import org.apache.spark.SparkContext._
24-
import org.apache.spark.rdd.RDD
25-
2620
import scala.collection.mutable.ArrayBuffer
2721
import scala.reflect.ClassTag
2822

29-
import org.apache.hadoop.mapred.JobConf
30-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
31-
import org.apache.hadoop.mapred.OutputFormat
3223
import org.apache.hadoop.conf.Configuration
33-
import org.apache.spark.streaming.{Time, Duration}
24+
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
25+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
26+
27+
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
28+
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.streaming.{Duration, Time}
30+
import org.apache.spark.streaming.StreamingContext._
3431

3532
/**
3633
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -590,11 +587,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
590587
keyClass: Class[_],
591588
valueClass: Class[_],
592589
outputFormatClass: Class[_ <: OutputFormat[_, _]],
593-
conf: JobConf = new JobConf
590+
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
594591
) {
592+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
593+
val serializableConf = new SerializableWritable(conf)
595594
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
596595
val file = rddToFileName(prefix, suffix, time)
597-
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
596+
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
598597
}
599598
self.foreachRDD(saveFunc)
600599
}
@@ -621,11 +620,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
621620
keyClass: Class[_],
622621
valueClass: Class[_],
623622
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
624-
conf: Configuration = new Configuration
623+
conf: Configuration = ssc.sparkContext.hadoopConfiguration
625624
) {
625+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
626+
val serializableConf = new SerializableWritable(conf)
626627
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
627628
val file = rddToFileName(prefix, suffix, time)
628-
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
629+
rdd.saveAsNewAPIHadoopFile(
630+
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
629631
}
630632
self.foreachRDD(saveFunc)
631633
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@ import java.nio.charset.Charset
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.reflect.ClassTag
25+
2526
import com.google.common.io.Files
26-
import org.apache.hadoop.fs.{Path, FileSystem}
2727
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.{FileSystem, Path}
29+
import org.apache.hadoop.io.{IntWritable, Text}
30+
import org.apache.hadoop.mapred.TextOutputFormat
31+
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
32+
2833
import org.apache.spark.streaming.StreamingContext._
2934
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
3035
import org.apache.spark.streaming.util.ManualClock
@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
205210
testCheckpointedOperation(input, operation, output, 7)
206211
}
207212

213+
test("recovery with saveAsHadoopFiles operation") {
214+
val tempDir = Files.createTempDir()
215+
try {
216+
testCheckpointedOperation(
217+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
218+
(s: DStream[String]) => {
219+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
220+
output.saveAsHadoopFiles(
221+
tempDir.toURI.toString,
222+
"result",
223+
classOf[Text],
224+
classOf[IntWritable],
225+
classOf[TextOutputFormat[Text, IntWritable]])
226+
output
227+
},
228+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
229+
3
230+
)
231+
} finally {
232+
Utils.deleteRecursively(tempDir)
233+
}
234+
}
235+
236+
test("recovery with saveAsNewAPIHadoopFiles operation") {
237+
val tempDir = Files.createTempDir()
238+
try {
239+
testCheckpointedOperation(
240+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
241+
(s: DStream[String]) => {
242+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
243+
output.saveAsNewAPIHadoopFiles(
244+
tempDir.toURI.toString,
245+
"result",
246+
classOf[Text],
247+
classOf[IntWritable],
248+
classOf[NewTextOutputFormat[Text, IntWritable]])
249+
output
250+
},
251+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
252+
3
253+
)
254+
} finally {
255+
Utils.deleteRecursively(tempDir)
256+
}
257+
}
208258

209259
// This tests whether the StateDStream's RDD checkpoints works correctly such
210260
// that the system can recover from a master failure. This assumes as reliable,
@@ -392,7 +442,9 @@ class CheckpointSuite extends TestSuiteBase {
392442
logInfo("Manual clock after advancing = " + clock.time)
393443
Thread.sleep(batchDuration.milliseconds)
394444

395-
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
445+
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
446+
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
447+
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
396448
outputStream.output.map(_.flatten)
397449
}
398450
}

0 commit comments

Comments
 (0)