-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-32629][SQL] Track metrics of BitSet/OpenHashSet in full outer SHJ #29566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @maropu and @cloud-fan if you guys have time to take a look, thanks. |
* @param plan `SparkPlan` operator to check metrics | ||
* @param expectedMetrics the expected metrics. The format is `metric name -> metric value`. | ||
*/ | ||
protected def testMetricsInSparkPlanOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to put this func here instead of SQLMetricsSuite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - I am following the convention like other method e.g. testSparkPlanMetrics
.
// in full outer shuffled hash join | ||
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) | ||
val buildDataSize = longMetric("buildDataSize") | ||
buildDataSize += matchedKeys.capacity / 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: longMetric("buildDataSize") += matchedKeys.capacity / 8
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - sure, updated.
Looks okay if the tests pass. also cc: @agrawaldevesh |
Basically, we don't need the |
@maropu - cool, thanks. |
Wow, cool. So any improvement in either memory usage, GC, or CPU time by switching to open hashset ? |
* 1. Process rows from stream side by looking up hash relation. | ||
* Mark the matched rows from build side be looked up. | ||
* A `BitSet` is used to track matched rows with key index. | ||
* A [[BitSet]] is used to track matched rows with key index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just say A bit set is ...
to be generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.
* 1. Process rows from stream side by looking up hash relation. | ||
* Mark the matched rows from build side be looked up. | ||
* A `BitSet` is used to track matched rows with key index. | ||
* A [[BitSet]] is used to track matched rows with key index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just say A bit set is ...
to be generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.
Test build #127975 has finished for PR 29566 at commit
|
@agrawaldevesh - here is report for one production query internally which is doing a FULL OUTER JOIN for one large table, and one small table. Couldn't share the exact query text and data but the query shape is: INSERT OVERWRITE TABLE output_table
PARTITION (...)
SELECT ...
FROM large_table a
FULL OUTER JOIN small_table b
ON a.col_x = f.col_y input metrics: execution metrics: Number of tasks per stage: Total shuffle bytes across executors: 15.1TB.
TLDR is switching to |
Address all comments and PR is ready for review again, thanks. |
Test build #127992 has finished for PR 29566 at commit
|
Thanks! Merged to master. |
Thanks @maropu , @cloud-fan and @agrawaldevesh for discussion and review! |
What changes were proposed in this pull request?
This is followup from #29342, where to do two things:
HashSet
to spark in-houseOpenHashSet
to track matched rows for non-unique join keys. I checkedOpenHashSet
implementation which is built from a key index (OpenHashSet._bitset
asBitSet
) and key array (OpenHashSet._data
asArray
). JavaHashSet
is built fromHashMap
, which stores value inNode
linked list and by theory should have taken more memory thanOpenHashSet
. Reran the same benchmark query used in [SPARK-32399][SQL] Full outer shuffled hash join #29342, and verified the query has similar performance here betweenHashSet
andOpenHashSet
.BitSet
/OpenHashSet
for full outer SHJ. This depends on above thing, because there seems no easy way to get javaHashSet
memory size.Why are the changes needed?
To better surface the memory usage for full outer SHJ more accurately.
This can help users/developers to debug/improve full outer SHJ.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unite test in
SQLMetricsSuite.scala
.