-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD #1598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
644665a
2c8debc
a435b5a
547bf3e
bc6e9e1
182fb46
2cc2d45
d69d397
7f6f251
c4ddc30
b3559b4
0eaaf56
84679b3
9d9af55
f5df97f
9d8447c
6b258b5
c79ca67
63de8f8
353a3f2
e9c0d5c
51aa135
1e5b801
61b2292
abe9e6e
8852aaf
f1d15b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio | |
import scala.collection.JavaConversions._ | ||
import scala.language.existentials | ||
import scala.reflect.ClassTag | ||
import scala.util.Try | ||
import scala.util.{Try, Success, Failure} | ||
|
||
import net.razorvine.pickle.{Pickler, Unpickler} | ||
|
||
|
@@ -536,25 +536,6 @@ private[spark] object PythonRDD extends Logging { | |
file.close() | ||
} | ||
|
||
/** | ||
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions). | ||
* It is only used by pyspark.sql. | ||
* TODO: Support more Python types. | ||
*/ | ||
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { | ||
pyRDD.rdd.mapPartitions { iter => | ||
val unpickle = new Unpickler | ||
iter.flatMap { row => | ||
unpickle.loads(row) match { | ||
// in case of objects are pickled in batch mode | ||
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap) | ||
// not in batch mode | ||
case obj: JMap[String @unchecked, _] => Seq(obj.toMap) | ||
} | ||
} | ||
} | ||
} | ||
|
||
private def getMergedConf(confAsMap: java.util.HashMap[String, String], | ||
baseConf: Configuration): Configuration = { | ||
val conf = PythonHadoopUtil.mapToConf(confAsMap) | ||
|
@@ -701,6 +682,54 @@ private[spark] object PythonRDD extends Logging { | |
} | ||
} | ||
|
||
|
||
/** | ||
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions). | ||
* This function is outdated, PySpark does not use it anymore | ||
*/ | ||
@deprecated | ||
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps it is too late if this was already public API... but should this be marked |
||
pyRDD.rdd.mapPartitions { iter => | ||
val unpickle = new Unpickler | ||
iter.flatMap { row => | ||
unpickle.loads(row) match { | ||
// in case of objects are pickled in batch mode | ||
case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, it's actually fine to check for a list in order to detect batching because we're not expecting to find an RDD of lists. |
||
// not in batch mode | ||
case obj: JMap[String @unchecked, _] => Seq(obj.toMap) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Convert an RDD of serialized Python tuple to Array (no recursive conversions). | ||
* It is only used by pyspark.sql. | ||
*/ | ||
def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole PythonRDD is private, so does it still need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I did not realize that. It could still perhaps be marked protected (to prevent other spark users from depending on it directly), but thats not as big of a deal. |
||
|
||
def toArray(obj: Any): Array[_] = { | ||
obj match { | ||
case objs: JArrayList[_] => | ||
objs.toArray | ||
case obj if obj.getClass.isArray => | ||
obj.asInstanceOf[Array[_]].toArray | ||
} | ||
} | ||
|
||
pyRDD.rdd.mapPartitions { iter => | ||
val unpickle = new Unpickler | ||
iter.flatMap { row => | ||
val obj = unpickle.loads(row) | ||
if (batched) { | ||
obj.asInstanceOf[JArrayList[_]].map(toArray) | ||
} else { | ||
Seq(toArray(obj)) | ||
} | ||
} | ||
}.toJavaRDD() | ||
} | ||
|
||
/** | ||
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by | ||
* PySpark. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this function is outdated should we mark it
@deprecated
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this was never public API maybe we should just delete it instead of deprecating it?