-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-45071][SQL] Optimize the processing speed of BinaryArithmetic#dataType
when processing multi-column data
#42804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
8d357f7
to
b250ca4
Compare
@ulysses-you cc |
wangyum
reviewed
Sep 5, 2023
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
Outdated
Show resolved
Hide resolved
wangyum
approved these changes
Sep 5, 2023
cc @cloud-fan |
cloud-fan
reviewed
Sep 5, 2023
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
Show resolved
Hide resolved
cloud-fan
approved these changes
Sep 5, 2023
Please merge it to master , thanks |
wangyum
pushed a commit
that referenced
this pull request
Sep 6, 2023
…#dataType` when processing multi-column data ### What changes were proposed in this pull request? Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed. For example, the following code: ```scala import spark.implicits._ import scala.util.Random import org.apache.spark.sql.Row import org.apache.spark.sql.functions.sum import org.apache.spark.sql.types.{StructType, StructField, IntegerType} val N = 30 val M = 100 val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString) val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5)) val schema = StructType(columns.map(StructField(_, IntegerType))) val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_))) val df = spark.createDataFrame(rdd, schema) val colExprs = columns.map(sum(_)) // gen a new column , and add the other 30 column df.withColumn("new_col_sum", expr(columns.mkString(" + "))) ``` This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698) ### Why are the changes needed? Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manual testing ### Was this patch authored or co-authored using generative AI tooling? no Closes #42804 from zzzzming95/SPARK-45071. Authored-by: zzzzming95 <505306252@qq.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 16e813c) Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum
pushed a commit
that referenced
this pull request
Sep 6, 2023
…#dataType` when processing multi-column data ### What changes were proposed in this pull request? Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed. For example, the following code: ```scala import spark.implicits._ import scala.util.Random import org.apache.spark.sql.Row import org.apache.spark.sql.functions.sum import org.apache.spark.sql.types.{StructType, StructField, IntegerType} val N = 30 val M = 100 val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString) val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5)) val schema = StructType(columns.map(StructField(_, IntegerType))) val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_))) val df = spark.createDataFrame(rdd, schema) val colExprs = columns.map(sum(_)) // gen a new column , and add the other 30 column df.withColumn("new_col_sum", expr(columns.mkString(" + "))) ``` This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698) ### Why are the changes needed? Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manual testing ### Was this patch authored or co-authored using generative AI tooling? no Closes #42804 from zzzzming95/SPARK-45071. Authored-by: zzzzming95 <505306252@qq.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 16e813c) Signed-off-by: Yuming Wang <yumwang@ebay.com>
Merged to master, branch-3.5 and branch-3.4. |
viirya
pushed a commit
to viirya/spark-1
that referenced
this pull request
Oct 19, 2023
…#dataType` when processing multi-column data ### What changes were proposed in this pull request? Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed. For example, the following code: ```scala import spark.implicits._ import scala.util.Random import org.apache.spark.sql.Row import org.apache.spark.sql.functions.sum import org.apache.spark.sql.types.{StructType, StructField, IntegerType} val N = 30 val M = 100 val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString) val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5)) val schema = StructType(columns.map(StructField(_, IntegerType))) val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_))) val df = spark.createDataFrame(rdd, schema) val colExprs = columns.map(sum(_)) // gen a new column , and add the other 30 column df.withColumn("new_col_sum", expr(columns.mkString(" + "))) ``` This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698) ### Why are the changes needed? Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manual testing ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#42804 from zzzzming95/SPARK-45071. Authored-by: zzzzming95 <505306252@qq.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 16e813c) Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit a96804b) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
zml1206
pushed a commit
to zml1206/spark
that referenced
this pull request
May 7, 2025
…#dataType` when processing multi-column data ### What changes were proposed in this pull request? Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed. For example, the following code: ```scala import spark.implicits._ import scala.util.Random import org.apache.spark.sql.Row import org.apache.spark.sql.functions.sum import org.apache.spark.sql.types.{StructType, StructField, IntegerType} val N = 30 val M = 100 val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString) val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5)) val schema = StructType(columns.map(StructField(_, IntegerType))) val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_))) val df = spark.createDataFrame(rdd, schema) val colExprs = columns.map(sum(_)) // gen a new column , and add the other 30 column df.withColumn("new_col_sum", expr(columns.mkString(" + "))) ``` This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698) ### Why are the changes needed? Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manual testing ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#42804 from zzzzming95/SPARK-45071. Authored-by: zzzzming95 <505306252@qq.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Since
BinaryArithmetic#dataType
will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.For example, the following code:
This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: SPARK-39316
Why are the changes needed?
Optimize the processing speed of
BinaryArithmetic#dataType
when processing multi-column dataDoes this PR introduce any user-facing change?
No
How was this patch tested?
manual testing
Was this patch authored or co-authored using generative AI tooling?
no