-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-14334] [SQL] add toLocalIterator for Dataset/DataFrame #12114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hm we need to think about the api -- this is a scala iterator right? it's going to be problematic for java users. maybe we can have toLocalScalaIterator and toLocalJavaIterator, or just have toLocalIterator returning a Java iterator, and scala users can easily do the implicit conversion or using asScala anyway. |
@rxin Good point, will use the Java iterator as public API. |
Test build #54715 has finished for PR 12114 at commit
|
Test build #54714 has finished for PR 12114 at commit
|
Test build #54716 has finished for PR 12114 at commit
|
cc @sameeragarwal for review |
@@ -71,6 +71,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |||
assert(ds.first() == item) | |||
assert(ds.take(1).head == item) | |||
assert(ds.takeAsList(1).get(0) == item) | |||
assert(ds.toLocalIterator().next === item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: next()
LGTM |
Test build #54869 has finished for PR 12114 at commit
|
Test build #54872 has finished for PR 12114 at commit
|
What changes were proposed in this pull request?
RDD.toLocalIterator() could be used to fetch one partition at a time to reduce the memory usage. Right now, for Dataset/Dataframe we have to use df.rdd.toLocalIterator, which is super slow also requires lots of memory (because of the Java serializer or even Kyro serializer).
This PR introduce an optimized toLocalIterator for Dataset/DataFrame, which is much faster and requires much less memory. For a partition with 5 millions rows,
df.rdd.toIterator
took about 100 seconds, but df.toIterator took less than 7 seconds. For 10 millions row, rdd.toIterator will crash (not enough memory) with 4G heap, but df.toLocalIterator could finished in 12 seconds.The JDBC server has been updated to use DataFrame.toIterator.
How was this patch tested?
Existing tests.