Skip to content

[SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD #1598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Jul 26, 2014

Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.

This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.

root
|-- field1: integer (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: integer (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = false)
|-- field6: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- field7: string (nullable = true)

Then we can access them by row.field3.field5[0] or row.field6[5].field7

It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.

You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:

ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))

Or you could use Row to create a class just like namedtuple, for example:

Person = Row("name", "age")
ctx.inferSchema(rdd.map(lambda x: Person(*x)))

Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The schema should be StructType, see the API docs for details.

schema = StructType([StructField("name, StringType, True),
StructType("age", IntegerType, True)])
ctx.applySchema(rdd, schema)

PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.

@SparkQA
Copy link

SparkQA commented Jul 26, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17211/consoleFull

@marmbrus
Copy link
Contributor

Can you add [SQL] to these PRs as well?

@SparkQA
Copy link

SparkQA commented Jul 26, 2014

QA results for PR 1598:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17211/consoleFull

@davies davies changed the title [WIP] [SPARK-2010] [PySpark] support nested structure in SchemaRDD [WIP] [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD Jul 26, 2014
@yhuai
Copy link
Contributor

yhuai commented Jul 29, 2014

With this PR, what does a StructType represent? namedtuple or array? Do we still keep the Row class in PySpark?

@davies
Copy link
Contributor Author

davies commented Jul 29, 2014

A StructType is presented as an namedtuple in Python, which is called Row.

The Row is generated according schema, there is no predefined Row class, so it's better to keep it internal.

Conflicts:
	python/pyspark/sql.py
	sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17397/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17399/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1598:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17397/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1598:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17399/consoleFull

@davies davies changed the title [WIP] [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD (part 1) Jul 30, 2014
@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17488/consoleFull

@davies
Copy link
Contributor Author

davies commented Jul 30, 2014

@yhuai @marmbrus @mateiz plz take a look at this, thx!

>>> srdd2.collect()
[Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None), \
Row(f1=2, f2=None, f3=Row(field4=22, field5=[10, 11]), f4=[Row(field7=u'row2')]), \
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking the doc comment like this is kind of weird; could you instead do a for r in srdd2.collect(): print r and get one per line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1598:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class DataType(object):
class PrimitiveType(DataType):
class StringType(PrimitiveType):
class BinaryType(PrimitiveType):
class BooleanType(PrimitiveType):
class TimestampType(PrimitiveType):
class DecimalType(PrimitiveType):
class DoubleType(PrimitiveType):
class FloatType(PrimitiveType):
class ByteType(PrimitiveType):
class IntegerType(PrimitiveType):
class LongType(PrimitiveType):
class ShortType(PrimitiveType):
class ArrayType(DataType):
class MapType(DataType):
class StructField(DataType):
class StructType(DataType):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17488/consoleFull

cls = _create_cls(self.schema())
return map(cls, rows)

# convert Row in JavaSchemaRDD into namedtuple, let access fields easier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should expand this comment a bit, e.g. "Convert each object in the RDD to a Row with the right class for this SchemaRDD, so that fields can be accessed as attributes." Also this needs to appear in some kind of class comment at the top, e.g. say "This class receives raw tuples from Java but assigns a class to it in all its data-collection methods (mapPartitionsWithIndex, collect, take, etc) so that PySpark sees them as Row objects with named fields".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

@mateiz
Copy link
Contributor

mateiz commented Jul 30, 2014

Made some comments on it from the Python side. @JoshRosen you may also want to take a look at the named tuple / class generation stuff here.



class StructType(object):
class StructType(DataType):
"""Spark SQL StructType

The data type representing namedtuple values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change it to The data type representing rows.?

* Convert an RDD of serialized Python tuple to Array (no recursive conversions).
* It is only used by pyspark.sql.
*/
def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole PythonRDD is private, so does it still need this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not realize that. It could still perhaps be marked protected (to prevent other spark users from depending on it directly), but thats not as big of a deal.

... double=1.0, long=1L, boolean=True, list=[1, 2, 3],
... time=datetime(2010, 1, 1, 1, 1, 1), dict={"a": 1})])
>>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string,
... x.double, x.long, x.boolean, x.time, x.dict["a"], x.list))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to also add a SQL test here to make sure that types are matching up with those expected in the execution engine. (though we might change the names to avoid conflict with reserved words, as we have not implemented identifier escaping). In particular the complex nested ones like dict and list. Also it would be good to add a nested Row to the input types.

Something like:

srdd.registerAsTable("pythonData")
sqlCtx.sql("SELECT dict['a'], list[0], nested.nestedField").collect() ...

@marmbrus
Copy link
Contributor

marmbrus commented Aug 1, 2014

This is looking really good to me! I'm very excited to have much more complete support for SQL in pyspark. A few minor comments on docs and testing, but I think we can merge this soon.

JMapWrapper(converted)
case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) => c.map {
case (key, value) => (convert(key, keyType), convert(value, valueType))
}.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the part of case (c: java.util.Map[_, _], struct: StructType) as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will case (c: java.util.Map[_, _], struct: StructType) happen with your change? How do we handle inner structs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Row() in python will be convert into tuple(), so It's fine to remove this case.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17700/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1598. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17703/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1598:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class DataType(object):
class PrimitiveType(DataType):
class StringType(PrimitiveType):
class BinaryType(PrimitiveType):
class BooleanType(PrimitiveType):
class TimestampType(PrimitiveType):
class DecimalType(PrimitiveType):
class DoubleType(PrimitiveType):
class FloatType(PrimitiveType):
class ByteType(PrimitiveType):
class IntegerType(PrimitiveType):
class LongType(PrimitiveType):
class ShortType(PrimitiveType):
class ArrayType(DataType):
class MapType(DataType):
class StructField(DataType):
class StructType(DataType):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17700/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1598:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class DataType(object):
class PrimitiveType(DataType):
class StringType(PrimitiveType):
class BinaryType(PrimitiveType):
class BooleanType(PrimitiveType):
class TimestampType(PrimitiveType):
class DecimalType(PrimitiveType):
class DoubleType(PrimitiveType):
class FloatType(PrimitiveType):
class ByteType(PrimitiveType):
class IntegerType(PrimitiveType):
class LongType(PrimitiveType):
class ShortType(PrimitiveType):
class ArrayType(DataType):
class MapType(DataType):
class StructField(DataType):
class StructType(DataType):
class List(list):
class Dict(dict):
class Row(tuple):
class Row(tuple):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17703/consoleFull

@marmbrus
Copy link
Contributor

marmbrus commented Aug 2, 2014

Thanks for working on this! I've merge it to master.

@asfgit asfgit closed this in 880eabe Aug 2, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.

This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.

root
 |-- field1: integer (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 |    |-- field4: integer (nullable = true)
 |    |-- field5: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |-- field6: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- field7: string (nullable = true)

Then we can access them by row.field3.field5[0]  or row.field6[5].field7

It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.

You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:

ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))

Or you could use Row to create a class just like namedtuple, for example:

Person = Row("name", "age")
ctx.inferSchema(rdd.map(lambda x: Person(*x)))

Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details.

schema = StructType([StructField("name, StringType, True),
                                    StructType("age", IntegerType, True)])
ctx.applySchema(rdd, schema)

PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.

Author: Davies Liu <davies.liu@gmail.com>

Closes apache#1598 from davies/nested and squashes the following commits:

f1d15b6 [Davies Liu] verify schema with the first few rows
8852aaf [Davies Liu] check type of schema
abe9e6e [Davies Liu] address comments
61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
1e5b801 [Davies Liu] improve cache of classes
51aa135 [Davies Liu] use Row to infer schema
e9c0d5c [Davies Liu] remove string typed schema
353a3f2 [Davies Liu] fix code style
63de8f8 [Davies Liu] fix typo
c79ca67 [Davies Liu] fix serialization of nested data
6b258b5 [Davies Liu] fix pep8
9d8447c [Davies Liu] apply schema provided by string of names
f5df97f [Davies Liu] refactor, address comments
9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested
0eaaf56 [Davies Liu] fix doc tests
b3559b4 [Davies Liu] use generated Row instead of namedtuple
c4ddc30 [Davies Liu] fix conflict between name of fields and variables
7f6f251 [Davies Liu] address all comments
d69d397 [Davies Liu] refactor
2cc2d45 [Davies Liu] refactor
182fb46 [Davies Liu] refactor
bc6e9e1 [Davies Liu] switch to new Schema API
547bf3e [Davies Liu] Merge branch 'master' into nested
a435b5a [Davies Liu] add docs and code refactor
2c8debc [Davies Liu] Merge branch 'master' into nested
644665a [Davies Liu] use tuple and namedtuple for schemardd
@davies davies deleted the nested branch September 15, 2014 22:18
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
Boson 0.2.5-beta includes the notIn parquet fix:

- build: Upgrade Arrow to 25.0.0 (pie/boson#599)
- feat: Support ansi mode of `sum` kernel (pie/boson#600) 
- build: Upgrade Parquet to 1.12.0.15-dev-apple (pie/boson#602)

Note this only affect when Boson is enabled in Spark.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants