Skip to content

Commit 7a176df

Browse files
committed
[SPARK-2024] Add saveAsSequenceFile to PySpark
[SPARK-2024] More refactoring and documentation [SPARK-2024] More Python API documentation [SPARK-2024] Add programming guide [SPARK-2024] Minor refactoring [SPARK-2024] Adding key converter for hbase_inputformat.py [SPARK-2024] Making Converter result type covariant [SPARK-2024] Adding HBase output converter example [SPARK-2024] Updates to TestInputFormat [SPARK-2024] Adding standalone tests [SPARK-2024] Style update [SPARK-2024] Adding negative tests [SPARK-2024] Adding a note for arrays [SPARK-2024] Adding Cassandra output format example [SPARK-2024] doc update [SPARK-2024] API doc update [SPARK-2024] more API doc update [SPARK-2024] codecClass -> compressionCodecClass [SPARK-2024] more API doc update [SPARK-2024] more doc update [SPARK-2024] fixing BytesWritable [SPARK-2024] update programming guide [SPARK-2024] addressing style checker complaints [SPARK-2024] Reserialize with Pickle Serializer if needed [SPARK-2024] Enable batch serialization to Python when reading sequence file [SPARK-2024] fixing TestWritable(JevaBean) test failure [SPARK-2024] Do not re-serialize if it is already in Pickle [SPARK-2024] Allow users to specify batch size on reading
1 parent 8904791 commit 7a176df

File tree

14 files changed

+983
-179
lines changed

14 files changed

+983
-179
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.api.python
1919

20+
import org.apache.spark.broadcast.Broadcast
2021
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.Logging
22+
import org.apache.spark.{Logging, SerializableWritable, SparkException}
2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.io._
2425
import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
3132
* transformation code by overriding the convert method.
3233
*/
3334
@Experimental
34-
trait Converter[T, U] extends Serializable {
35+
trait Converter[T, + U] extends Serializable {
3536
def convert(obj: T): U
3637
}
3738

3839
private[python] object Converter extends Logging {
3940

40-
def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
41+
def getInstance(converterClass: Option[String],
42+
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
4143
converterClass.map { cc =>
4244
Try {
4345
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,15 +51,17 @@ private[python] object Converter extends Logging {
4951
logError(s"Failed to load converter: $cc")
5052
throw err
5153
}
52-
}.getOrElse { new DefaultConverter }
54+
}.getOrElse { defaultConverter }
5355
}
5456
}
5557

5658
/**
5759
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
5860
* Other objects are passed through without conversion.
5961
*/
60-
private[python] class DefaultConverter extends Converter[Any, Any] {
62+
private[python] class WritableToJavaConverter(
63+
conf: Broadcast[SerializableWritable[Configuration]],
64+
batchSize: Int) extends Converter[Any, Any] {
6165

6266
/**
6367
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
7276
case fw: FloatWritable => fw.get()
7377
case t: Text => t.toString
7478
case bw: BooleanWritable => bw.get()
75-
case byw: BytesWritable => byw.getBytes
79+
case byw: BytesWritable =>
80+
val bytes = new Array[Byte](byw.getLength)
81+
System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
82+
bytes
7683
case n: NullWritable => null
77-
case aw: ArrayWritable => aw.get().map(convertWritable(_))
78-
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
79-
(convertWritable(k), convertWritable(v))
80-
}.toMap)
84+
case aw: ArrayWritable =>
85+
// Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
86+
// Since we can't determine element types for empty arrays, we will not attempt to
87+
// convert to primitive arrays (which get pickled to Python arrays). Users may want
88+
// write custom converters for arrays if they know the element types a priori.
89+
aw.get().map(convertWritable(_))
90+
case mw: MapWritable =>
91+
val map = new java.util.HashMap[Any, Any]()
92+
mw.foreach { case (k, v) =>
93+
map.put(convertWritable(k), convertWritable(v))
94+
}
95+
map
96+
case w: Writable =>
97+
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
8198
case other => other
8299
}
83100
}
84101

85-
def convert(obj: Any): Any = {
102+
override def convert(obj: Any): Any = {
86103
obj match {
87104
case writable: Writable =>
88105
convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
92109
}
93110
}
94111

112+
/**
113+
* A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
114+
* types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
115+
* to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
116+
* [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
117+
* PySpark RDD `saveAsNewAPIHadoopFile` doctest.
118+
*/
119+
private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
120+
121+
/**
122+
* Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
123+
* supported out-of-the-box.
124+
*/
125+
private def convertToWritable(obj: Any): Writable = {
126+
import collection.JavaConversions._
127+
obj match {
128+
case i: java.lang.Integer => new IntWritable(i)
129+
case d: java.lang.Double => new DoubleWritable(d)
130+
case l: java.lang.Long => new LongWritable(l)
131+
case f: java.lang.Float => new FloatWritable(f)
132+
case s: java.lang.String => new Text(s)
133+
case b: java.lang.Boolean => new BooleanWritable(b)
134+
case aob: Array[Byte] => new BytesWritable(aob)
135+
case null => NullWritable.get()
136+
case map: java.util.Map[_, _] =>
137+
val mapWritable = new MapWritable()
138+
map.foreach { case (k, v) =>
139+
mapWritable.put(convertToWritable(k), convertToWritable(v))
140+
}
141+
mapWritable
142+
case other => throw new SparkException(
143+
s"Data of type ${other.getClass.getName} cannot be used")
144+
}
145+
}
146+
147+
override def convert(obj: Any): Writable = obj match {
148+
case writable: Writable => writable
149+
case other => convertToWritable(other)
150+
}
151+
}
152+
95153
/** Utilities for working with Python objects <-> Hadoop-related objects */
96154
private[python] object PythonHadoopUtil {
97155

@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
118176

119177
/**
120178
* Converts an RDD of key-value pairs, where key and/or value could be instances of
121-
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
179+
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
122180
*/
123181
def convertRDD[K, V](rdd: RDD[(K, V)],
124182
keyConverter: Converter[Any, Any],

0 commit comments

Comments
 (0)