Skip to content

[SPARK-30409][SPARK-29173][SQL][TESTS] Use NoOp datasource in SQL benchmarks #27078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jan 2, 2020

What changes were proposed in this pull request?

In the PR, I propose to replace .collect(), .count() and .foreach(_ => ()) in SQL benchmarks and use the NoOp datasource. I added an implicit class to SqlBasedBenchmark with the .noop() method. It can be used in benchmark like: ds.noop(). The last one is unfolded to ds.write.format("noop").mode(Overwrite).save().

Why are the changes needed?

To avoid additional overhead that collect() (and other actions) has. For example, .collect() has to convert values according to external types and pull data to the driver. This can hide actual performance regressions or improvements of benchmarked operations.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Re-run all modified benchmarks using Amazon EC2.

Item Description
Region us-west-2 (Oregon)
Instance r3.xlarge (spot instance)
AMI ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1)
Java OpenJDK8/11
# `spark-tpcds-datagen` needs this. (JDK8)
$ git clone https://github.com/apache/spark.git -b branch-2.4 --depth 1 spark-2.4
$ export SPARK_HOME=$PWD
$ ./build/mvn clean package -DskipTests

# Generate data. (JDK8)
$ git clone git@github.com:maropu/spark-tpcds-datagen.git
$ cd spark-tpcds-datagen/
$ build/mvn clean package
$ mkdir -p /data/tpcds
$ ./bin/dsdgen --output-location /data/tpcds/s1  // This need `Spark 2.4`
  • Other benchmarks ran by the script:
#!/usr/bin/env python3

import os
from sparktestsupport.shellutils import run_cmd

benchmarks = [
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.AggregateBenchmark'],
    ['avro/test', 'org.apache.spark.sql.execution.benchmark.AvroReadBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.DataSourceReadBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.DateTimeBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.ExtractBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.FilterPushdownBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.InExpressionBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.IntervalBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.JoinBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.MakeDateTimeBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.MiscBenchmark'],
    ['hive/test', 'org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.OrcNestedSchemaPruningBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.OrcV2NestedSchemaPruningBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.ParquetNestedSchemaPruningBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.RangeBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.UDFBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.WideSchemaBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.benchmark.WideTableBenchmark'],
    ['hive/test', 'org.apache.spark.sql.hive.orc.OrcReadBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.datasources.csv.CSVBenchmark'],
    ['sql/test', 'org.apache.spark.sql.execution.datasources.json.JsonBenchmark']
]

print('Set SPARK_GENERATE_BENCHMARK_FILES=1')
os.environ['SPARK_GENERATE_BENCHMARK_FILES'] = '1'

for b in benchmarks:
    print("Run benchmark: %s" % b[1])
    run_cmd(['build/sbt', '%s:runMain %s' % (b[0], b[1])])

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me if that's the best way to execute without doing anything else

@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116042 has finished for PR 27078 at commit 6615d5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 2, 2020

... if that's the best way to execute without doing anything else

NoOp datasource just materializes rows and doesn't do anything else.

@HyukjinKwon
Copy link
Member

Should we update the benchmark results too?

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 3, 2020

Should we update the benchmark results too?

I can run all SQL benchmarks locally on Mac OS or I can launch an EC2 instance with Linux (not sure how much will it cost to me). My concerns are only:

  1. New benchmark results will significantly increase PR size
  2. @dongjoon-hyun usually run all benchmarks on particular configuration. I don't know how to reproduce the same environment at my side.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-30409][SQL][TESTS] Use NoOp datasource in SQL benchmarks [SPARK-30409][SPARK-29173][SQL][TESTS] Use NoOp datasource in SQL benchmarks Jan 11, 2020
@dongjoon-hyun
Copy link
Member

Here is TPCDS result, @MaxGekk .

@SparkQA
Copy link

SparkQA commented Jan 11, 2020

Test build #116511 has finished for PR 27078 at commit 0cfe42a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 12, 2020

@dongjoon-hyun oh, sorry I missed your second PR.

@dongjoon-hyun
Copy link
Member

Thank you. No problem, @MaxGekk .

What I want to say is that this NoOp PR improves many cases according to the design, but some cases show a regression in the number of execution time. I believe it's not due to the change of this PR. It would be the change on the underlying master branch code.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 12, 2020

It would be the change on the underlying master branch code.

I think it makes sense to open separate JIRAs per each case as we did before?

Join w long wholestage off 4685 4814 182 4.5 223.4 1.0X
Join w long wholestage on 440 524 102 47.7 21.0 10.7X
Join w long wholestage off 4531 4557 37 4.6 216.1 1.0X
Join w long wholestage on 1214 1310 95 17.3 57.9 3.7X
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk . For example, this case, this slowed down 3x times in both JDK8 and JDK11. Also, it's mostly consistent in the other cases in this suite. It's a huge regression which is orthogonal to this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you file a JIRA issue for this JoinBenchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do that tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master. Thank you, @MaxGekk and all.

dongjoon-hyun added a commit that referenced this pull request Jan 12, 2020
…dk11-results.txt

### What changes were proposed in this pull request?

This PR removes a dangling test result, `JSONBenchmark-jdk11-results.txt`.
This causes a case-sensitive issue on Mac.

```
$ git clone https://gitbox.apache.org/repos/asf/spark.git spark-gitbox
Cloning into 'spark-gitbox'...
remote: Counting objects: 671717, done.
remote: Compressing objects: 100% (258021/258021), done.
remote: Total 671717 (delta 329181), reused 560390 (delta 228097)
Receiving objects: 100% (671717/671717), 149.69 MiB | 950.00 KiB/s, done.
Resolving deltas: 100% (329181/329181), done.
Updating files: 100% (16090/16090), done.
warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:

  'sql/core/benchmarks/JSONBenchmark-jdk11-results.txt'
  'sql/core/benchmarks/JsonBenchmark-jdk11-results.txt'
```

### Why are the changes needed?

Previously, since the file name didn't match with `object JSONBenchmark`, it made a confusion when we ran the benchmark. So, 4e0e4e5 renamed `JSONBenchmark` to `JsonBenchmark`. However, at the same time frame, #26003 regenerated this file.

Recently, #27078 regenerates the results with the correct file name, `JsonBenchmark-jdk11-results.txt`. So, we can remove the old one.

### Does this PR introduce any user-facing change?

No. This is a test result.

### How was this patch tested?

Manually check the following correctly generated files in the master. And, check this PR removes the dangling one.
- https://github.com/apache/spark/blob/master/sql/core/benchmarks/JsonBenchmark-results.txt
- https://github.com/apache/spark/blob/master/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt

Closes #27180 from dongjoon-hyun/SPARK-REMOVE.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116569 has finished for PR 27078 at commit eccde05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

10 units w/ interval 3041 3060 16 0.3 3041.3 0.2X
10 units w/o interval 3031 3043 15 0.3 3031.2 0.2X
11 units w/ interval 3270 3280 9 0.3 3269.9 0.2X
11 units w/o interval 3273 3280 7 0.3 3272.6 0.2X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regression by 66%. Here is the JIRA for investigation: https://issues.apache.org/jira/browse/SPARK-30562

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, regression vs what here... the change should have made the benchmarks faster, but they were also re-run. Is it the new instance type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few commits in interval implementation between 2 re-runs. The regression can be related to those commits but to my changes.

Is it the new instance type?

It is the same.

Parsing with UTF-8 120085 121975 1705 0.4 2401.7 0.1X
Text read 10249 10314 77 4.9 205.0 1.0X
Schema inferring 35403 35436 40 1.4 708.1 0.3X
Parsing without charset 32875 32879 4 1.5 657.5 0.3X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This became faster almost up to 3 times

------------------------------------------------------------------------------------------------------------------------
1 select expressions 5 13 8 0.0 5370143.0 1.0X
100 select expressions 12 16 6 0.0 11995425.0 0.4X
2500 select expressions 211 214 4 0.0 210927791.0 0.0X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here there is slow down ~2 times. I opened JIRA for investigations: https://issues.apache.org/jira/browse/SPARK-30564

100 deep x 1000 rows (exec in-mem) 1283 1283 0 0.1 12830.5 0.0X
100 deep x 1000 rows (read parquet) 1201 1205 7 0.1 12005.2 0.0X
100 deep x 1000 rows (write parquet) 436 443 9 0.2 4361.4 0.1X
250 deep x 400 rows (read in-mem) 1882 1883 1 0.1 18819.9 0.0X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive built-in ORC 520 531 8 2.0 495.8 0.6X
Native ORC MR 250 264 15 4.2 238.1 1.0X
Native ORC Vectorized 121 138 24 8.7 115.5 2.1X
Hive built-in ORC 1761 1792 43 0.6 1679.3 0.1X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here there is a regression ~3 times. https://issues.apache.org/jira/browse/SPARK-30565

@dongjoon-hyun
Copy link
Member

Thank you for the follow-up, @MaxGekk .

@MaxGekk MaxGekk deleted the noop-in-benchmarks branch June 5, 2020 19:42
cloud-fan pushed a commit that referenced this pull request Mar 9, 2021
### What changes were proposed in this pull request?

`BroadcastNestedLoopJoinExec` does not have code-gen, and we can potentially boost the CPU performance for this operator if we add code-gen for it. https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html also showed the evidence in one fork.

The codegen for `BroadcastNestedLoopJoinExec` shared some code with `HashJoin`, and the interface `JoinCodegenSupport` is created to hold those common logic. This PR is only supporting inner and cross join. Other join types will be added later in followup PRs.

Example query and generated code:

```
val df1 = spark.range(4).select($"id".as("k1"))
val df2 = spark.range(3).select($"id".as("k2"))
df1.join(df2, $"k1" + 1 =!= $"k2").explain("codegen")
```

```
== Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) ==
*(2) BroadcastNestedLoopJoin BuildRight, Inner, NOT ((k1#2L + 1) = k2#6L)
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4, step=1, splits=2)
+- BroadcastExchange IdentityBroadcastMode, [id=#22]
   +- *(1) Project [id#4L AS k2#6L]
      +- *(1) Range (0, 3, step=1, splits=2)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private InternalRow[] bnlj_buildRowArray_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */     bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value();
/* 032 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException {
/* 037 */     for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) {
/* 038 */       UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0];
/* 039 */
/* 040 */       long bnlj_value_1 = bnlj_buildRow_0.getLong(0);
/* 041 */
/* 042 */       long bnlj_value_4 = -1L;
/* 043 */
/* 044 */       bnlj_value_4 = bnlj_expr_0_0 + 1L;
/* 045 */
/* 046 */       boolean bnlj_value_3 = false;
/* 047 */       bnlj_value_3 = bnlj_value_4 == bnlj_value_1;
/* 048 */       boolean bnlj_value_2 = false;
/* 049 */       bnlj_value_2 = !(bnlj_value_3);
/* 050 */       if (!(false || !bnlj_value_2))
/* 051 */       {
/* 052 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 053 */
/* 054 */         range_mutableStateArray_0[3].reset();
/* 055 */
/* 056 */         range_mutableStateArray_0[3].write(0, bnlj_expr_0_0);
/* 057 */
/* 058 */         range_mutableStateArray_0[3].write(1, bnlj_value_1);
/* 059 */         append((range_mutableStateArray_0[3].getRow()).copy());
/* 060 */
/* 061 */       }
/* 062 */     }
/* 063 */
/* 064 */   }
/* 065 */
/* 066 */   private void initRange(int idx) {
/* 067 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 068 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 069 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L);
/* 070 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 071 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 072 */     long partitionEnd;
/* 073 */
/* 074 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 075 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 076 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 077 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 078 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 079 */     } else {
/* 080 */       range_nextIndex_0 = st.longValue();
/* 081 */     }
/* 082 */     range_batchEnd_0 = range_nextIndex_0;
/* 083 */
/* 084 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 085 */     .multiply(step).add(start);
/* 086 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 087 */       partitionEnd = Long.MAX_VALUE;
/* 088 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 089 */       partitionEnd = Long.MIN_VALUE;
/* 090 */     } else {
/* 091 */       partitionEnd = end.longValue();
/* 092 */     }
/* 093 */
/* 094 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 095 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 096 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 097 */     if (range_numElementsTodo_0 < 0) {
/* 098 */       range_numElementsTodo_0 = 0;
/* 099 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 100 */       range_numElementsTodo_0++;
/* 101 */     }
/* 102 */   }
/* 103 */
/* 104 */   protected void processNext() throws java.io.IOException {
/* 105 */     // initialize Range
/* 106 */     if (!range_initRange_0) {
/* 107 */       range_initRange_0 = true;
/* 108 */       initRange(partitionIndex);
/* 109 */     }
/* 110 */
/* 111 */     while (true) {
/* 112 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 113 */         long range_nextBatchTodo_0;
/* 114 */         if (range_numElementsTodo_0 > 1000L) {
/* 115 */           range_nextBatchTodo_0 = 1000L;
/* 116 */           range_numElementsTodo_0 -= 1000L;
/* 117 */         } else {
/* 118 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 119 */           range_numElementsTodo_0 = 0;
/* 120 */           if (range_nextBatchTodo_0 == 0) break;
/* 121 */         }
/* 122 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 123 */       }
/* 124 */
/* 125 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 126 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 127 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 128 */
/* 129 */         // common sub-expressions
/* 130 */
/* 131 */         bnlj_doConsume_0(range_value_0);
/* 132 */
/* 133 */         if (shouldStop()) {
/* 134 */           range_nextIndex_0 = range_value_0 + 1L;
/* 135 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 136 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 137 */           return;
/* 138 */         }
/* 139 */
/* 140 */       }
/* 141 */       range_nextIndex_0 = range_batchEnd_0;
/* 142 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 143 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 144 */       range_taskContext_0.killTaskIfInterrupted();
/* 145 */     }
/* 146 */   }
/* 147 */
/* 148 */ }
```

### Why are the changes needed?

Improve query CPU performance. Added a micro benchmark query in `JoinBenchmark.scala`.
Saw 1x of run time improvement:

```
OpenJDK 64-Bit Server VM 11.0.9+11-LTS on Linux 4.14.219-161.340.amzn2.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2  2.50GHz
broadcast nested loop join:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------
broadcast nested loop join wholestage off          62922          63052         184          0.3        3000.3       1.0X
broadcast nested loop join wholestage on           30946          30972          26          0.7        1475.6       2.0X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

* Added unit test in `WholeStageCodegenSuite.scala`, and existing unit tests for `BroadcastNestedLoopJoinExec`.
* Updated golden files for several TCPDS query plans, as whole stage code-gen for `BroadcastNestedLoopJoinExec` is triggered.
* Updated `JoinBenchmark-jdk11-results.txt ` and `JoinBenchmark-results.txt` with new benchmark result. Followed previous benchmark PRs - #27078 and #26003 to use same type of machine:

```
Amazon AWS EC2
type: r3.xlarge
region: us-west-2 (Oregon)
OS: Linux
```

Closes #31736 from c21/nested-join-exec.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants