Skip to content

[SPARK-45071][SQL] Optimize the processing speed of BinaryArithmetic#dataType when processing multi-column data #42804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

zzzzming95
Copy link
Contributor

@zzzzming95 zzzzming95 commented Sep 4, 2023

What changes were proposed in this pull request?

Since BinaryArithmetic#dataType will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:

import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: SPARK-39316

Why are the changes needed?

Optimize the processing speed of BinaryArithmetic#dataType when processing multi-column data

Does this PR introduce any user-facing change?

No

How was this patch tested?

manual testing

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Sep 4, 2023
@zzzzming95
Copy link
Contributor Author

@ulysses-you cc

@wangyum
Copy link
Member

wangyum commented Sep 5, 2023

cc @cloud-fan

@zzzzming95
Copy link
Contributor Author

@cloud-fan @wangyum

Please merge it to master , thanks

@wangyum wangyum closed this in 16e813c Sep 6, 2023
wangyum pushed a commit that referenced this pull request Sep 6, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Sep 6, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Sep 6, 2023

Merged to master, branch-3.5 and branch-3.4.

viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit a96804b)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
zml1206 pushed a commit to zml1206/spark that referenced this pull request May 7, 2025
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants