Skip to content

Include SparkR #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 6, 2017
Merged

Include SparkR #92

merged 2 commits into from
Jan 6, 2017

Conversation

robert3005
Copy link

add SparkR to dist.

made the parameter lists uniform everywhere

@robert3005
Copy link
Author

Will wait for build finish and check the dists

@robert3005
Copy link
Author

Checked locally and dist looks ok and has spark R. Will merge and we can do another pr if something isn't right.

@robert3005 robert3005 merged commit f762e6b into master Jan 6, 2017
@robert3005 robert3005 deleted the rk/sparkr branch January 6, 2017 23:08
mattsills pushed a commit to mattsills/spark that referenced this pull request Jul 17, 2020
…tions

### What changes were proposed in this pull request?

In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`.

How to reproduce:
```scala
val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
```

```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
   :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [id#5L]
   :     +- *(1) Filter isnotnull(id#5L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
   +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#7L, 500), true, [id=palantir#49]
         +- *(2) Range (0, 8, step=1, splits=16)
```
vs
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
   +- SortMergeJoin [id#5L], [id#7L], Inner
      :- Sort [id#5L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#5L, 1000), true, [id=palantir#93]
      :     +- Project [id#5L]
      :        +- Filter isnotnull(id#5L)
      :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
      +- Sort [id#7L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#7L, 1000), true, [id=palantir#92]
            +- Range (0, 8, step=1, splits=16)
```

This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`.

### Why are the changes needed?
Do not degrade performance after enabling adaptive execution.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes apache#26409 from wangyum/SPARK-29655.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant