Skip to content

[SPARK-2314][SQL] Override collect and take in python library, and count in java library, with optimized versions. #1592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

staple
Copy link
Contributor

@staple staple commented Jul 25, 2014

SchemaRDD overrides RDD functions, including collect, count, and take, with optimized versions making use of the query optimizer. The java and python interface classes wrapping SchemaRDD need to ensure the optimized versions are called as well. This patch overrides relevant calls in the python and java interfaces with optimized versions.

Adds a new Row serialization pathway between python and java, based on JList[Array[Byte]] versus the existing RDD[Array[Byte]]. I wasn’t overjoyed about doing this, but I noticed that some QueryPlans implement optimizations in executeCollect(), which outputs an Array[Row] rather than the typical RDD[Row] that can be shipped to python using the existing serialization code. To me it made sense to ship the Array[Row] over to python directly instead of converting it back to an RDD[Row] just for the purpose of sending the Rows to python using the existing serialization code.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@marmbrus
Copy link
Contributor

Thanks for working on this! We'll need to coordinate merging with #1346 and related PRs. (cc @yhuai)

@JoshRosen can you look at the other pyspark changes?

@staple
Copy link
Contributor Author

staple commented Jul 30, 2014

Sure, I’m fine with reworking based on other changes (it seems that some merge conflicts have already cropped up in master since I submitted my PR last week). I think my change set is a little simpler than the one you linked to, so would it make sense for me to wait until that one goes in?

I also thought I’d add a couple of notes on what I had in mind with this patch:

  1. I added a new Row serialization pathway between python and java, based on JList[Array[Byte]] versus the existing RDD[Array[Byte]]. I wasn’t overjoyed about doing this, but I noticed that some QueryPlans implement optimizations in executeCollect(), which outputs an Array[Row] rather than the typical RDD[Row] that can be shipped to python using the existing serialization code. To me it made sense to ship the Array[Row] over to python directly instead of converting it back to an RDD[Row] just for the purpose of sending the Rows to python using the existing serialization code. But let me know if you have any thoughts about this.

  2. I moved JavaStackTrace from rdd.py to context.py. This made sense to me since JavaStackTrace is all about configuring a context attribute, and the _extract_concise_traceback function it depends on was already being called separately from context.py (as a ‘private’ function of rdd.py).

@marmbrus
Copy link
Contributor

Yeah, I'm hoping to merge #1346 as soon as it passes Jenkins, so I'd wait for that.

I also thought I’d add a couple of notes on what I had in mind with this patch: ...

Can you add these notes to the PR description so that they get included in the commit message?

@staple
Copy link
Contributor Author

staple commented Jul 30, 2014

Sure, added the notes.

@staple
Copy link
Contributor Author

staple commented Jul 31, 2014

Hi, I've updated the patch to work with the new code in master.

@marmbrus
Copy link
Contributor

cc @davies

@davies
Copy link
Contributor

davies commented Jul 31, 2014

@staple Could you hold it one more days until we merge the changes in #1598 about serialization between Java and Python?

@staple
Copy link
Contributor Author

staple commented Jul 31, 2014

Sure, no problem.

@davies
Copy link
Contributor

davies commented Aug 2, 2014

@staple could you rebase this pr to PR-1598? it gets very close to merge.

@marmbrus
Copy link
Contributor

marmbrus commented Aug 3, 2014

This seems to have captured a bunch of unrelated changes during the rebase.

@staple
Copy link
Contributor Author

staple commented Aug 3, 2014

Sorry, I'm away from home and had limited time / access to try and do the merge last night - which I didn't finish, and as you mentioned messed up the included commits. I'll post an explicit comment here when the merge is ready.

@staple
Copy link
Contributor Author

staple commented Aug 4, 2014

Hi folks I’ve merged with the most recent code (pushed to my branch), but with the most recent merge I am getting NPEs in Kryo for schemas containing array data type fields in the sql.py tests. I’m away from home, with no real dev system and spotty internet access until Thursday, so unfortunately I think it’s impractical for me to diagnose the problem until then. Sorry for the delay.

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2014

Hey @staple if you have time to update this would be great to include it.

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@staple
Copy link
Contributor Author

staple commented Sep 6, 2014

Sorry for the delay. I wrote a simple test case in pyspark to trigger the kryo NPEs I've started seeing, that doesn't require my patch. I tested against master at ba5bcad.

from pyspark.sql import Row, SQLContext
OneField = Row('f1')
rdd = sc.parallelize( [ OneField( ['x'] ) ] )
sqlCtx = SQLContext(sc)
srdd = sqlCtx.inferSchema(rdd)
srdd._jschema_rdd.collect()

The kryo NPE is coming from the TaskResultGetter:

14/09/05 21:59:47 ERROR TaskResultGetter: Exception while getting task result
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
underlying (scala.collection.convert.Wrappers$JListWrapper)
values (org.apache.spark.sql.catalyst.expressions.GenericRow)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1276)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)
Caused by: java.lang.NullPointerException
at scala.collection.convert.Wrappers$MutableBufferWrapper.add(Wrappers.scala:80)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 23 more

@davies, do you have any insight as to the cause?

@davies
Copy link
Contributor

davies commented Sep 6, 2014

@marmbrus This seems that JListWrapper() can not be serialized by kryo correctly, but I don't know how to reproduce it in Scala.

Reproduce this bug in one line Python:

from pyspark.sql import SQLContext; SQLContext(sc).inferSchema(sc.parallelize([{"a": [3]}]))._jschema_rdd.collect()

@marmbrus
Copy link
Contributor

marmbrus commented Sep 7, 2014

Hmm, that kryo error is unfortunate. We'll probably need to add a special serializer in SparkSqlSerializer to work around this.

@marmbrus
Copy link
Contributor

marmbrus commented Sep 8, 2014

#2323 seems to fix the Kryo error for me. Let me know if you have further issues after updating this to master.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have started for PR 1592. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/8/consoleFull

@staple
Copy link
Contributor Author

staple commented Sep 9, 2014

Great! I'll go ahead and merge once #2323 is in.

@marmbrus
Copy link
Contributor

I just merged #2323 to master.

@staple
Copy link
Contributor Author

staple commented Sep 11, 2014

Thanks!, I'm not getting any more NPEs now. I went ahead and merged.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 1592 at commit e3b802f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 1592 at commit e3b802f.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JavaStackTrace(object):

@staple
Copy link
Contributor Author

staple commented Sep 11, 2014

Hi, the failing test was CheckpointSuite / 'recovery with file input stream'. This test passed when I ran the tests locally, and it sometimes fails spuriously according to this ticket:

'flaky test case in streaming.CheckpointSuite'
https://issues.apache.org/jira/browse/SPARK-1600

@staple
Copy link
Contributor Author

staple commented Sep 11, 2014

Also, I'm assuming I don't have permission to ask jenkins to run a test myself, right?

@JoshRosen
Copy link
Contributor

@staple The Jenkins pull request builder is in an odd state of flux right now. I've manually re-triggered your build (I should have self-service "retest this please" working more consistently by sometime next week).

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 1592 at commit e3b802f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 1592 at commit e3b802f.

  • This patch merges cleanly.

@staple
Copy link
Contributor Author

staple commented Sep 11, 2014

@JoshRosen Great, thanks for your help!

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 1592 at commit e3b802f.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JavaStackTrace(object):

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 1592 at commit e3b802f.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JavaStackTrace(object):

@marmbrus
Copy link
Contributor

SQL LGTM.

@JoshRosen is this ready to go?

@@ -36,6 +37,65 @@

from py4j.java_collections import ListConverter

__all__ = ["JavaStackTrace", "SparkContext"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file sets __all__ further down on line 100; we should only set it once. Actually, do you mind just moving the extract_concise_traceback stuff to its own file? There's actually an open JIRA ticket for this (SPARK-1087) and if you're going to touch the traceback code anyways now seems like a good time to do this refactoring. You could name the file something like traceback_utils.py.

@JoshRosen
Copy link
Contributor

@staple @marmbrus Aside from my comments on moving the traceback functions into their own file, this looks good to me.

@staple
Copy link
Contributor Author

staple commented Sep 14, 2014

Thanks for taking a look, guys.

Hmm, it looks like the duplicate __all__ variables result from a recent merge. I went ahead and created a separate PR for SPARK-1087, to put the traceback code in its own file: #2385. Once that’s merged to master I’ll circle back to finish up this PR, making the include changes you requested.

@JoshRosen
Copy link
Contributor

Now that #2385 has been merged, this looks like it will be ready to merge as soon as you rebase it on top of master.

@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

Ok, just merged with master.

@marmbrus
Copy link
Contributor

Thanks! I've merged this to master.

@asfgit asfgit closed this in 8e7ae47 Sep 16, 2014
@staple
Copy link
Contributor Author

staple commented Sep 16, 2014

Great! Thanks

sunchao added a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…ache#1592)

This fixes two Spark test failures related to Boson:
- `ParquetEncryptionSuite` is failing in Boson because `org.apache.parquet.crypto.keytools.mocks.InMemoryKMS` is not included in the shaded `boson-spark` jar
- `HadoopFsRelationTest` is failing because it generates date time that's before the cutoff date of Proleptic Gregorian calendar.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants