Skip to content

Commit e8d3f2b

Browse files
committed
Merge pull request apache#502 from pwendell/clone-1
Remove Hadoop object cloning and warn users making Hadoop RDD's. The code introduced in apache#359 used Hadoop's WritableUtils.clone() to duplicate objects when reading from Hadoop files. Some users have reported exceptions when cloning data in various file formats, including Avro and another custom format. This patch removes that functionality to ensure stability for the 0.9 release. Instead, it puts a clear warning in the documentation that copying may be necessary for Hadoop data sets. (cherry picked from commit c319617) Conflicts: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
1 parent 7a62353 commit e8d3f2b

File tree

5 files changed

+138
-116
lines changed

5 files changed

+138
-116
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 81 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ class SparkContext(
341341
*/
342342
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
343343
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
344-
minSplits, cloneRecords = false).map(pair => pair._2.toString)
344+
minSplits).map(pair => pair._2.toString)
345345
}
346346

347347
/**
@@ -354,33 +354,37 @@ class SparkContext(
354354
* @param keyClass Class of the keys
355355
* @param valueClass Class of the values
356356
* @param minSplits Minimum number of Hadoop Splits to generate.
357-
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
358-
* Most RecordReader implementations reuse wrapper objects across multiple
359-
* records, and can cause problems in RDD collect or aggregation operations.
360-
* By default the records are cloned in Spark. However, application
361-
* programmers can explicitly disable the cloning for better performance.
357+
*
358+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
359+
* record, directly caching the returned RDD will create many references to the same object.
360+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
361+
* a `map` function.
362362
*/
363-
def hadoopRDD[K: ClassTag, V: ClassTag](
363+
def hadoopRDD[K, V](
364364
conf: JobConf,
365365
inputFormatClass: Class[_ <: InputFormat[K, V]],
366366
keyClass: Class[K],
367367
valueClass: Class[V],
368-
minSplits: Int = defaultMinSplits,
369-
cloneRecords: Boolean = true
368+
minSplits: Int = defaultMinSplits
370369
): RDD[(K, V)] = {
371370
// Add necessary security credentials to the JobConf before broadcasting it.
372371
SparkHadoopUtil.get.addCredentials(conf)
373-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
372+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
374373
}
375374

376-
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
377-
def hadoopFile[K: ClassTag, V: ClassTag](
375+
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
376+
*
377+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
378+
* record, directly caching the returned RDD will create many references to the same object.
379+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
380+
* a `map` function.
381+
* */
382+
def hadoopFile[K, V](
378383
path: String,
379384
inputFormatClass: Class[_ <: InputFormat[K, V]],
380385
keyClass: Class[K],
381386
valueClass: Class[V],
382-
minSplits: Int = defaultMinSplits,
383-
cloneRecords: Boolean = true
387+
minSplits: Int = defaultMinSplits
384388
): RDD[(K, V)] = {
385389
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
386390
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
392396
inputFormatClass,
393397
keyClass,
394398
valueClass,
395-
minSplits,
396-
cloneRecords)
399+
minSplits)
397400
}
398401

399402
/**
@@ -403,16 +406,20 @@ class SparkContext(
403406
* {{{
404407
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
405408
* }}}
409+
*
410+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
411+
* record, directly caching the returned RDD will create many references to the same object.
412+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
413+
* a `map` function.
406414
*/
407415
def hadoopFile[K, V, F <: InputFormat[K, V]]
408-
(path: String, minSplits: Int, cloneRecords: Boolean = true)
416+
(path: String, minSplits: Int)
409417
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
410418
hadoopFile(path,
411419
fm.runtimeClass.asInstanceOf[Class[F]],
412420
km.runtimeClass.asInstanceOf[Class[K]],
413421
vm.runtimeClass.asInstanceOf[Class[V]],
414-
minSplits,
415-
cloneRecords)
422+
minSplits)
416423
}
417424

418425
/**
@@ -422,68 +429,91 @@ class SparkContext(
422429
* {{{
423430
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
424431
* }}}
432+
*
433+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
434+
* record, directly caching the returned RDD will create many references to the same object.
435+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
436+
* a `map` function.
425437
*/
426-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
438+
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
427439
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
428-
hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
440+
hadoopFile[K, V, F](path, defaultMinSplits)
429441

430442
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
431443
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
432-
(path: String, cloneRecords: Boolean = true)
444+
(path: String)
433445
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
434446
newAPIHadoopFile(
435447
path,
436448
fm.runtimeClass.asInstanceOf[Class[F]],
437449
km.runtimeClass.asInstanceOf[Class[K]],
438-
vm.runtimeClass.asInstanceOf[Class[V]],
439-
cloneRecords = cloneRecords)
450+
vm.runtimeClass.asInstanceOf[Class[V]])
440451
}
441452

442453
/**
443454
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
444455
* and extra configuration options to pass to the input format.
456+
*
457+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
458+
* record, directly caching the returned RDD will create many references to the same object.
459+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
460+
* a `map` function.
445461
*/
446-
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
462+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
447463
path: String,
448464
fClass: Class[F],
449465
kClass: Class[K],
450466
vClass: Class[V],
451-
conf: Configuration = hadoopConfiguration,
452-
cloneRecords: Boolean = true): RDD[(K, V)] = {
467+
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
453468
val job = new NewHadoopJob(conf)
454469
NewFileInputFormat.addInputPath(job, new Path(path))
455470
val updatedConf = job.getConfiguration
456-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
471+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
457472
}
458473

459474
/**
460475
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
461476
* and extra configuration options to pass to the input format.
477+
*
478+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
479+
* record, directly caching the returned RDD will create many references to the same object.
480+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
481+
* a `map` function.
462482
*/
463-
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
483+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
464484
conf: Configuration = hadoopConfiguration,
465485
fClass: Class[F],
466486
kClass: Class[K],
467-
vClass: Class[V],
468-
cloneRecords: Boolean = true): RDD[(K, V)] = {
469-
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
470-
}
471-
472-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
473-
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
487+
vClass: Class[V]): RDD[(K, V)] = {
488+
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
489+
}
490+
491+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
492+
*
493+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
494+
* record, directly caching the returned RDD will create many references to the same object.
495+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
496+
* a `map` function.
497+
*/
498+
def sequenceFile[K, V](path: String,
474499
keyClass: Class[K],
475500
valueClass: Class[V],
476-
minSplits: Int,
477-
cloneRecords: Boolean = true
501+
minSplits: Int
478502
): RDD[(K, V)] = {
479503
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
480-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
504+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
481505
}
482506

483-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
484-
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
485-
cloneRecords: Boolean = true): RDD[(K, V)] =
486-
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
507+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
508+
*
509+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
510+
* record, directly caching the returned RDD will create many references to the same object.
511+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
512+
* a `map` function.
513+
* */
514+
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
515+
): RDD[(K, V)] =
516+
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
487517

488518
/**
489519
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +530,14 @@ class SparkContext(
500530
* have a parameterized singleton object). We use functions instead to create a new converter
501531
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
502532
* allow it to figure out the Writable class to use in the subclass case.
533+
*
534+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
535+
* record, directly caching the returned RDD will create many references to the same object.
536+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
537+
* a `map` function.
503538
*/
504539
def sequenceFile[K, V]
505-
(path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
540+
(path: String, minSplits: Int = defaultMinSplits)
506541
(implicit km: ClassTag[K], vm: ClassTag[V],
507542
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
508543
: RDD[(K, V)] = {
@@ -511,7 +546,7 @@ class SparkContext(
511546
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
512547
val writables = hadoopFile(path, format,
513548
kc.writableClass(km).asInstanceOf[Class[Writable]],
514-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
549+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
515550
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
516551
}
517552

@@ -1024,7 +1059,7 @@ object SparkContext {
10241059
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
10251060

10261061
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1027-
rdd: RDD[(K, V)]) =
1062+
rdd: RDD[(K, V)]) =
10281063
new SequenceFileRDDFunctions(rdd)
10291064

10301065
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
137137
*/
138138
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
139139

140-
/**Get an RDD for a Hadoop SequenceFile with given key and value types. */
140+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
141+
*
142+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
143+
* record, directly caching the returned RDD will create many references to the same object.
144+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
145+
* a `map` function.
146+
* */
141147
def sequenceFile[K, V](path: String,
142148
keyClass: Class[K],
143149
valueClass: Class[V],
@@ -148,7 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
148154
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
149155
}
150156

151-
/**Get an RDD for a Hadoop SequenceFile. */
157+
/** Get an RDD for a Hadoop SequenceFile.
158+
*
159+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
160+
* record, directly caching the returned RDD will create many references to the same object.
161+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
162+
* a `map` function.
163+
*/
152164
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
153165
JavaPairRDD[K, V] = {
154166
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -184,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
184196
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
185197
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
186198
* etc).
199+
*
200+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
201+
* record, directly caching the returned RDD will create many references to the same object.
202+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
203+
* a `map` function.
187204
*/
188205
def hadoopRDD[K, V, F <: InputFormat[K, V]](
189206
conf: JobConf,
@@ -201,6 +218,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
201218
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
202219
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
203220
* etc).
221+
*
222+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
223+
* record, directly caching the returned RDD will create many references to the same object.
224+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
225+
* a `map` function.
204226
*/
205227
def hadoopRDD[K, V, F <: InputFormat[K, V]](
206228
conf: JobConf,
@@ -213,7 +235,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
213235
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
214236
}
215237

216-
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
238+
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
239+
*
240+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
241+
* record, directly caching the returned RDD will create many references to the same object.
242+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
243+
* a `map` function.
244+
*/
217245
def hadoopFile[K, V, F <: InputFormat[K, V]](
218246
path: String,
219247
inputFormatClass: Class[F],
@@ -226,7 +254,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
226254
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
227255
}
228256

229-
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
257+
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
258+
*
259+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
260+
* record, directly caching the returned RDD will create many references to the same object.
261+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
262+
* a `map` function.
263+
*/
230264
def hadoopFile[K, V, F <: InputFormat[K, V]](
231265
path: String,
232266
inputFormatClass: Class[F],
@@ -242,6 +276,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
242276
/**
243277
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
244278
* and extra configuration options to pass to the input format.
279+
*
280+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
281+
* record, directly caching the returned RDD will create many references to the same object.
282+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
283+
* a `map` function.
245284
*/
246285
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
247286
path: String,
@@ -257,6 +296,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
257296
/**
258297
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
259298
* and extra configuration options to pass to the input format.
299+
*
300+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
301+
* record, directly caching the returned RDD will create many references to the same object.
302+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
303+
* a `map` function.
260304
*/
261305
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
262306
conf: Configuration,

0 commit comments

Comments
 (0)