Skip to content

Commit 94d1f46

Browse files
kanzhangJoshRosen
authored andcommitted
[SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024 This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats. * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs. * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types. * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples. * Added HBase and Cassandra output examples to show how custom output formats and converters can be used. cc MLnick mateiz ahirreddy pwendell Author: Kan Zhang <kzhang@apache.org> Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits: c01e3ef [Kan Zhang] [SPARK-2024] code formatting 6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10 57a7a5e [Kan Zhang] [SPARK-2024] correcting typo 75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD 0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests 9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests 0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases 7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
1 parent 437dc8c commit 94d1f46

File tree

14 files changed

+1085
-186
lines changed

14 files changed

+1085
-186
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.api.python
1919

20+
import org.apache.spark.broadcast.Broadcast
2021
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.Logging
22+
import org.apache.spark.{Logging, SerializableWritable, SparkException}
2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.io._
2425
import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
3132
* transformation code by overriding the convert method.
3233
*/
3334
@Experimental
34-
trait Converter[T, U] extends Serializable {
35+
trait Converter[T, + U] extends Serializable {
3536
def convert(obj: T): U
3637
}
3738

3839
private[python] object Converter extends Logging {
3940

40-
def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
41+
def getInstance(converterClass: Option[String],
42+
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
4143
converterClass.map { cc =>
4244
Try {
4345
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,15 +51,17 @@ private[python] object Converter extends Logging {
4951
logError(s"Failed to load converter: $cc")
5052
throw err
5153
}
52-
}.getOrElse { new DefaultConverter }
54+
}.getOrElse { defaultConverter }
5355
}
5456
}
5557

5658
/**
5759
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
5860
* Other objects are passed through without conversion.
5961
*/
60-
private[python] class DefaultConverter extends Converter[Any, Any] {
62+
private[python] class WritableToJavaConverter(
63+
conf: Broadcast[SerializableWritable[Configuration]],
64+
batchSize: Int) extends Converter[Any, Any] {
6165

6266
/**
6367
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
7276
case fw: FloatWritable => fw.get()
7377
case t: Text => t.toString
7478
case bw: BooleanWritable => bw.get()
75-
case byw: BytesWritable => byw.getBytes
79+
case byw: BytesWritable =>
80+
val bytes = new Array[Byte](byw.getLength)
81+
System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
82+
bytes
7683
case n: NullWritable => null
77-
case aw: ArrayWritable => aw.get().map(convertWritable(_))
78-
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
79-
(convertWritable(k), convertWritable(v))
80-
}.toMap)
84+
case aw: ArrayWritable =>
85+
// Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
86+
// Since we can't determine element types for empty arrays, we will not attempt to
87+
// convert to primitive arrays (which get pickled to Python arrays). Users may want
88+
// write custom converters for arrays if they know the element types a priori.
89+
aw.get().map(convertWritable(_))
90+
case mw: MapWritable =>
91+
val map = new java.util.HashMap[Any, Any]()
92+
mw.foreach { case (k, v) =>
93+
map.put(convertWritable(k), convertWritable(v))
94+
}
95+
map
96+
case w: Writable =>
97+
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
8198
case other => other
8299
}
83100
}
84101

85-
def convert(obj: Any): Any = {
102+
override def convert(obj: Any): Any = {
86103
obj match {
87104
case writable: Writable =>
88105
convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
92109
}
93110
}
94111

112+
/**
113+
* A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
114+
* types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
115+
* to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
116+
* [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
117+
* PySpark RDD `saveAsNewAPIHadoopFile` doctest.
118+
*/
119+
private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
120+
121+
/**
122+
* Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
123+
* supported out-of-the-box.
124+
*/
125+
private def convertToWritable(obj: Any): Writable = {
126+
import collection.JavaConversions._
127+
obj match {
128+
case i: java.lang.Integer => new IntWritable(i)
129+
case d: java.lang.Double => new DoubleWritable(d)
130+
case l: java.lang.Long => new LongWritable(l)
131+
case f: java.lang.Float => new FloatWritable(f)
132+
case s: java.lang.String => new Text(s)
133+
case b: java.lang.Boolean => new BooleanWritable(b)
134+
case aob: Array[Byte] => new BytesWritable(aob)
135+
case null => NullWritable.get()
136+
case map: java.util.Map[_, _] =>
137+
val mapWritable = new MapWritable()
138+
map.foreach { case (k, v) =>
139+
mapWritable.put(convertToWritable(k), convertToWritable(v))
140+
}
141+
mapWritable
142+
case other => throw new SparkException(
143+
s"Data of type ${other.getClass.getName} cannot be used")
144+
}
145+
}
146+
147+
override def convert(obj: Any): Writable = obj match {
148+
case writable: Writable => writable
149+
case other => convertToWritable(other)
150+
}
151+
}
152+
95153
/** Utilities for working with Python objects <-> Hadoop-related objects */
96154
private[python] object PythonHadoopUtil {
97155

@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
118176

119177
/**
120178
* Converts an RDD of key-value pairs, where key and/or value could be instances of
121-
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
179+
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
122180
*/
123181
def convertRDD[K, V](rdd: RDD[(K, V)],
124182
keyConverter: Converter[Any, Any],

0 commit comments

Comments
 (0)