Skip to content

Commit a9944c8

Browse files
committed
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles
Solves two JIRAs in one shot - Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints - Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#3457 from tdas/savefiles-fix and squashes the following commits: bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles. (cherry picked from commit 8838ad7) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 1e356a8 commit a9944c8

File tree

2 files changed

+70
-16
lines changed

2 files changed

+70
-16
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.streaming.StreamingContext._
21-
22-
import org.apache.spark.{Partitioner, HashPartitioner}
23-
import org.apache.spark.SparkContext._
24-
import org.apache.spark.rdd.RDD
25-
2620
import scala.collection.mutable.ArrayBuffer
2721
import scala.reflect.ClassTag
2822

29-
import org.apache.hadoop.mapred.JobConf
30-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
31-
import org.apache.hadoop.mapred.OutputFormat
3223
import org.apache.hadoop.conf.Configuration
33-
import org.apache.spark.streaming.{Time, Duration}
24+
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
25+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
26+
27+
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
28+
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.streaming.{Duration, Time}
30+
import org.apache.spark.streaming.StreamingContext._
3431

3532
/**
3633
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -625,11 +622,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
625622
keyClass: Class[_],
626623
valueClass: Class[_],
627624
outputFormatClass: Class[_ <: OutputFormat[_, _]],
628-
conf: JobConf = new JobConf
625+
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
629626
) {
627+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
628+
val serializableConf = new SerializableWritable(conf)
630629
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
631630
val file = rddToFileName(prefix, suffix, time)
632-
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
631+
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
633632
}
634633
self.foreachRDD(saveFunc)
635634
}
@@ -656,11 +655,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
656655
keyClass: Class[_],
657656
valueClass: Class[_],
658657
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
659-
conf: Configuration = new Configuration
658+
conf: Configuration = ssc.sparkContext.hadoopConfiguration
660659
) {
660+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
661+
val serializableConf = new SerializableWritable(conf)
661662
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
662663
val file = rddToFileName(prefix, suffix, time)
663-
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
664+
rdd.saveAsNewAPIHadoopFile(
665+
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
664666
}
665667
self.foreachRDD(saveFunc)
666668
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@ import java.nio.charset.Charset
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.reflect.ClassTag
25+
2526
import com.google.common.io.Files
26-
import org.apache.hadoop.fs.{Path, FileSystem}
2727
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.{FileSystem, Path}
29+
import org.apache.hadoop.io.{IntWritable, Text}
30+
import org.apache.hadoop.mapred.TextOutputFormat
31+
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
32+
2833
import org.apache.spark.streaming.StreamingContext._
2934
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
3035
import org.apache.spark.streaming.util.ManualClock
@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
205210
testCheckpointedOperation(input, operation, output, 7)
206211
}
207212

213+
test("recovery with saveAsHadoopFiles operation") {
214+
val tempDir = Files.createTempDir()
215+
try {
216+
testCheckpointedOperation(
217+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
218+
(s: DStream[String]) => {
219+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
220+
output.saveAsHadoopFiles(
221+
tempDir.toURI.toString,
222+
"result",
223+
classOf[Text],
224+
classOf[IntWritable],
225+
classOf[TextOutputFormat[Text, IntWritable]])
226+
output
227+
},
228+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
229+
3
230+
)
231+
} finally {
232+
Utils.deleteRecursively(tempDir)
233+
}
234+
}
235+
236+
test("recovery with saveAsNewAPIHadoopFiles operation") {
237+
val tempDir = Files.createTempDir()
238+
try {
239+
testCheckpointedOperation(
240+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
241+
(s: DStream[String]) => {
242+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
243+
output.saveAsNewAPIHadoopFiles(
244+
tempDir.toURI.toString,
245+
"result",
246+
classOf[Text],
247+
classOf[IntWritable],
248+
classOf[NewTextOutputFormat[Text, IntWritable]])
249+
output
250+
},
251+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
252+
3
253+
)
254+
} finally {
255+
Utils.deleteRecursively(tempDir)
256+
}
257+
}
208258

209259
// This tests whether the StateDStream's RDD checkpoints works correctly such
210260
// that the system can recover from a master failure. This assumes as reliable,
@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
391441
logInfo("Manual clock after advancing = " + clock.time)
392442
Thread.sleep(batchDuration.milliseconds)
393443

394-
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
444+
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
445+
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
446+
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
395447
outputStream.output.map(_.flatten)
396448
}
397449
}

0 commit comments

Comments
 (0)