-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17549][sql] Only collect table size stat in driver for cached relation. #15112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…relation. The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions.
I have touched this part for a long time. I think we also use min/max to evaluate predicates. Can you double check? Also, what stats do we collect right now? |
The current code collects On the driver side, all that seems to be done is to sum up the sizes of each column to provide a |
I see that the stats are used in |
Thanks! I will take a look. btw, if you have comparison related to memory footprint before and after the change, it will be good to add that in the description. |
That's all in the bug. |
if (a.getClass.getName == codeAttr.getName) { | ||
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( | ||
codeAttrField.get(a).asInstanceOf[Array[Byte]].length) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this block fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Thanks. Seems it is a separate issue. Let's not change this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. I forgot to send the above comment...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But without this fix you can't read the table I described in the bug at all, because SQL just blows up. If you want a separate patch just for that ok, but seems like overkill to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My worry is that we will just forget about this issue if we just make it log a warning. Removing this try/catch will not fail any existing tests, right? We can create a new jira to fix this issue for Spark 2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about my suggestion of adding the workaround and filing a bug? Then there's no worry about forgetting anything.
Because it's most probably a Janino bug, fixing it might not be as simple as just making some change in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Seems this part is used to record some metrics. I guess it is fine. But, let me ping @ericl who added this method to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case I filed SPARK-17565 to track the actual fix. This is just a workaround so Spark doesn't fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ok to me. We have a unit test for the metric here so it isn't likely to break entirely without notice.
val sizeInBytes = | ||
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum | ||
Statistics(sizeInBytes = sizeInBytes) | ||
Statistics(sizeInBytes = batchStats.value.longValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you double check if we have test to make sure the total size is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that I changed the stat and all tests still passed locally, I doubt we have one... I'll take a look once I find some time to get back to this patch.
Test build #65449 has finished for PR 15112 at commit
|
|
||
// Check that the right size was calculated. | ||
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Test build #65459 has finished for PR 15112 at commit
|
@yhuai any more comments? I really want to keep the codegen metrics change because otherwise Spark just fails on the large table I tested on. We can file a separate bug to look at the janino issue and point at this one (and the data attached to the bug) as the source of the issue. |
if (a.getClass.getName == codeAttr.getName) { | ||
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( | ||
codeAttrField.get(a).asInstanceOf[Array[Byte]].length) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ok to me. We have a unit test for the metric here so it isn't likely to break entirely without notice.
} | ||
} | ||
} catch { | ||
case e: Exception => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NonFatal(e)?
Test build #65501 has finished for PR 15112 at commit
|
LGTM. Merging to master and branch 2.0. |
…relation. The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15112 from vanzin/SPARK-17549. (cherry picked from commit 39e2bad) Signed-off-by: Yin Huai <yhuai@databricks.com>
…relation. The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#15112 from vanzin/SPARK-17549.
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.
It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.
There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.
On a mildly related change, I'm also adding code to catch exceptions in the
code generator since Janino was breaking with the test data I tried this
patch on.
Tested with unit tests and by doing a count a very wide table (20k columns)
with many partitions.