Skip to content

Conversation

c21
Copy link
Contributor

@c21 c21 commented Aug 28, 2020

What changes were proposed in this pull request?

This is followup from #29342, where to do two things:

  • Per [SPARK-32399][SQL] Full outer shuffled hash join #29342 (comment), change from java HashSet to spark in-house OpenHashSet to track matched rows for non-unique join keys. I checked OpenHashSet implementation which is built from a key index (OpenHashSet._bitset as BitSet) and key array (OpenHashSet._data as Array). Java HashSet is built from HashMap, which stores value in Node linked list and by theory should have taken more memory than OpenHashSet. Reran the same benchmark query used in [SPARK-32399][SQL] Full outer shuffled hash join #29342, and verified the query has similar performance here between HashSet and OpenHashSet.
  • Track metrics of the extra data structure BitSet/OpenHashSet for full outer SHJ. This depends on above thing, because there seems no easy way to get java HashSet memory size.

Why are the changes needed?

To better surface the memory usage for full outer SHJ more accurately.
This can help users/developers to debug/improve full outer SHJ.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unite test in SQLMetricsSuite.scala .

@c21
Copy link
Contributor Author

c21 commented Aug 28, 2020

cc @maropu and @cloud-fan if you guys have time to take a look, thanks.

@c21 c21 changed the title [SPARK-32629] Track metrics of BitSet/OpenHashSet in full outer SHJ [SPARK-32629][SQL][FOLLOWUP] Track metrics of BitSet/OpenHashSet in full outer SHJ Aug 28, 2020
* @param plan `SparkPlan` operator to check metrics
* @param expectedMetrics the expected metrics. The format is `metric name -> metric value`.
*/
protected def testMetricsInSparkPlanOperator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to put this func here instead of SQLMetricsSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - I am following the convention like other method e.g. testSparkPlanMetrics.

// in full outer shuffled hash join
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
val buildDataSize = longMetric("buildDataSize")
buildDataSize += matchedKeys.capacity / 8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: longMetric("buildDataSize") += matchedKeys.capacity / 8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

@maropu
Copy link
Member

maropu commented Aug 28, 2020

Looks okay if the tests pass. also cc: @agrawaldevesh

@maropu maropu changed the title [SPARK-32629][SQL][FOLLOWUP] Track metrics of BitSet/OpenHashSet in full outer SHJ [SPARK-32629][SQL] Track metrics of BitSet/OpenHashSet in full outer SHJ Aug 28, 2020
@maropu
Copy link
Member

maropu commented Aug 28, 2020

Basically, we don't need the [FOLLOWUP] tag for a new ticket, I think.

@c21
Copy link
Contributor Author

c21 commented Aug 28, 2020

Basically, we don't need the [FOLLOWUP] tag for a new ticket, I think.

@maropu - cool, thanks.

@agrawaldevesh
Copy link

Wow, cool. So any improvement in either memory usage, GC, or CPU time by switching to open hashset ?

* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A `BitSet` is used to track matched rows with key index.
* A [[BitSet]] is used to track matched rows with key index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just say A bit set is ... to be generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated.

* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A `BitSet` is used to track matched rows with key index.
* A [[BitSet]] is used to track matched rows with key index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just say A bit set is ... to be generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated.

@SparkQA
Copy link

SparkQA commented Aug 28, 2020

Test build #127975 has finished for PR 29566 at commit 4a35fa6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 28, 2020

So any improvement in either memory usage, GC, or CPU time by switching to open hashset ?

@agrawaldevesh - here is report for one production query internally which is doing a FULL OUTER JOIN for one large table, and one small table. Couldn't share the exact query text and data but the query shape is:

INSERT OVERWRITE TABLE output_table
PARTITION (...)
SELECT ...
FROM large_table a
FULL OUTER JOIN small_table b
ON a.col_x = f.col_y

input metrics:
large_table (ORC format): uncompressed data input size: 54TB
small_table (ORC format): uncompressed data input size: 85GB

execution metrics:

Number of tasks per stage:
stage 0: 40547 (read large table)
stage 1: 158 (read small table)
stage 2: 5063 (SHJ and insert to output table)

Total shuffle bytes across executors: 15.1TB.

Query type Aggregated executors CPU time (ms) Aggregated executors GC time (ms)
use java HashSet 3.48 B 124.0 M
use spark OpenHashSet 3.22 B 91.2 M

TLDR is switching to OpenHashSet, we are seeing 7% CPU reduction and 27% GC time reduction.

@c21
Copy link
Contributor Author

c21 commented Aug 28, 2020

Address all comments and PR is ready for review again, thanks.

@SparkQA
Copy link

SparkQA commented Aug 28, 2020

Test build #127992 has finished for PR 29566 at commit b932caa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in cfe012a Aug 29, 2020
@maropu
Copy link
Member

maropu commented Aug 29, 2020

Thanks! Merged to master.

@c21
Copy link
Contributor Author

c21 commented Aug 29, 2020

Thanks @maropu , @cloud-fan and @agrawaldevesh for discussion and review!

@c21 c21 deleted the add-metrics branch August 29, 2020 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants