-
Notifications
You must be signed in to change notification settings - Fork 1
Spark 24914 working #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a little while to get my reproduction case (inappropriate broadcast that results in OOM) working with Spark 3.0. So I got started a little late.
Anyway, I've started playing around and reviewing, and I will continue tomorrow.
For my parquet test case, I noticed that the deserFactor seemed rather low (it was 2, when I expected more like 10). For the orc case, it seemed spot on (751).
This is my reprod case for parquet:
// ./bin/spark-shell --driver-memory 12g // create data import scala.util.Random val rand = new Random val df = Seq.tabulate(5000000) { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e") df.createOrReplaceTempView("df") sql("drop table if exists fred") sql("create table fred stored as parquet as select * from df") val df = spark.range(0, 2000000).map(x => (x, x + 1, x + 2, x + 3)).toDF("a", "b", "c", "d") df.createOrReplaceTempView("df") sql("drop table if exists phil") sql("create table phil stored as parquet as select * from df") // ./bin/spark-shell --master "local-cluster[6, 1, 4096]" --driver-memory 1g // run test sql("select f.a, f.b, f.c, f.d, p.c from fred f, phil p where f.b = p.a").show
This is my reprod case with ORC. In this case, I had a smaller TextFile table (but whose files are bigger), and a bigger ORC table(but whose files are smaller)
// ./bin/spark-shell --driver-memory 12g // create data val df = spark.range(0, 5000000).map { x => (x, x + 1, x + 2, x + 3)}.toDF("a", "b", "c", "d") df.createOrReplaceTempView("df") sql("drop table if exists fredorc") sql("create table fredorc stored as orc as select * from df") val df = spark.range(0, 2000000).map { x => (x, x + 1, x + 2, x + 3)}.toDF("a", "b", "c", "d") df.createOrReplaceTempView("df") sql("drop table if exists philtxt") sql("create table philtxt stored as textfile as select * from df") // ./bin/spark-shell --master "local-cluster[6, 1, 4096]" --driver-memory 1g // run test sql("select f.a, f.b, f.c, f.d, p.c from fredorc f, philtxt p where f.b = p.a").show
val newTotalSize = CommandUtils.calculateLocationSize( | ||
sessionState, tableMeta.identifier, p.storage.locationUri) | ||
val totalSizeWithDeserFact = CommandUtils.calculateLocationSize( | ||
sparkSession.sessionState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the result is not just total size (but I can revert this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably meant the indentation so I have reverted that back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant sparkSession.sessionState vs sessionState.
if (cboEnabled && rowCount.isDefined) { | ||
val attrStats = AttributeMap(planOutput | ||
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType)))) | ||
// Estimate size as number of rows * row size. | ||
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) | ||
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) | ||
Statistics( | ||
sizeInBytes = applyDeserFactor(size, deserFactorDistortion), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to apply deserFactor to size when CBO is enabled, as I think that code already tries to calculate the size based on expected column size and number of rows (not based on raw file size). You might want to check on how EstimationUtils.getOutputSize calculates size and then see if you really needs this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Great finding! :)
val DESERIALIZATION_FACTOR_CALC_ENABLED = | ||
buildConf("spark.sql.statistics.deserFactor.calc.enabled") | ||
.doc("Enables the calculation of the deserialization factor as a table statistic. " + | ||
"This factor is calculated for columnar storage formats by dividing the raw byte size " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to touch up the description a little bit, but I can't think of anything good at this time of night :).
For this one piece, maybe "This factor is calculated for columnar storage formats as a ratio of actual data size to raw file size".
.doc("Enables the calculation of the deserialization factor as a table statistic. " + | ||
"This factor is calculated for columnar storage formats by dividing the raw byte size " + | ||
"of uncompressed data with the file size. It is used for scaling up size in bytes " + | ||
"statistic which leads to better a estimate of in-memory data size which effects " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud:
"Spark uses this ratio to scale up the estimated size, which leads to better estimate of in-memory data size and improves the query optimization (i.e., join strategy). Spark stores a ratio, rather than the data size, so that the table can grow without having to recompute statistics"
"In case of partitioned table the maximum of these factors is taken. " + | ||
"When the factor is already calculated (and stored in the meta store) but the " + | ||
"calculation is disabled in a subsequent ANALYZE TABLE (by setting this config to false) " + | ||
"then the old factor will be applied as this factor can be removed only by TRUNCATE or a " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing you could do is ignore the deserFactor when calculating statistics at query time if this config setting is not true. Maybe that makes the code messy (having to check SQLConf in inconvenient places).
@bersprockets Thanks for the help! Regarding parquet I have generated the table at my place and iIt is 8 files:
Now if you take the number of rows 2000000 based on:
And multiply it by num columns (4) and with the mem size used to store one bigint 8 bytes then divide it by the number of files you got: 8 000 000 bytes. So deserFact=2 seams to me the right value. |
I was thinking more of this case, where there is a lot of repeating data: import scala.util.Random val rand = new Random val df = Seq.tabulate(5000000) { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e") In this case, the ratio is more like 10, but the deserFactor is calculated as 2. |
val catalogTable2 = getCatalogTable(table) | ||
assert(catalogTable2.stats.isDefined && | ||
catalogTable2.stats.get.deserFactor.isDefined) | ||
assert(catalogTable2.stats.get.deserFactor.get > deserFactorBefore, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test implies that size of data impacts the deserFactor (ratio), when in reality (I believe) it's the make-up of the data: Whether data is incrementing sequentially, or is repeating, or is random. Is that not the case?
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") | ||
checkDeserializationFactor(table, exists = false) | ||
val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") | ||
checkNumBroadcastHashJoins(res, 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could instead just check that sizeInBytes == origSizeInBytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep this assert as it is follows the same pattern used at previous cases, too
: Unit = test(s"SPARK-24914 - test deserialization factor ($fileformat)") { | ||
val table = s"sizeTest" | ||
|
||
withTable(table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how important this is for the tests, but my memory from an escalation is that Spark calculates sizeInBytes differently depending on whether the table is partitioned or not.
Here's my recollection (for ORC, anyway):
- Not partitioned: Spark uses totalSize
- Partitioned and spark.sql.hive.convertMetastoreOrc=true: Spark prunes the partitions and sums the size of the files in the relevant partitions.
- Partitioned and spark.sql.hive.convertMetastoreOrc=false: : Spark uses totalSize
I was trying to figure out a few things, since my memory has faded. Here are my notes as I looked at some code (in the context of parquet tables, but I assume the same applies for ORC): Non-partitioned Hive table with statisticsSpark gets the statistics from totalSize in the Hive table. When the user inserts new rows into the table, Spark updates totalSize in Hive (at least it does on my laptop). I assume insert operations from Hive will also update totalSize (although I have not tested that). However, if you do something whacky like the following, the statistics get out-of-date (as you would expect): import scala.util.Random val rand = new Random val df = spark.range(0, 5000000).map { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e") df.write.mode("append").format("parquet").save("spark-warehouse/desertest") Non-partitioned Hive table with no statisticsIn this case, Spark gets the sizeInBytes from the HadoopFsRelation (relation.sizeInBytes) in the LogicalRelation, which in turn gets the table size from location (a FileIndex, or more specifically, InMemoryFileIndex). Note that So it seems that when a Hive table has no statistics, Spark gets sizeInBytes from the sum of the current file sizes times the fileCompressionFactor. I guess in this case Spark will always be up-to-date with the file sizes. That is, when files are dumped into the tables directory (e.g., my example above), sizeInBytes will reflect the change. If we had an external tool to calculate deserFactor, or an option on ANALYZE TABLE, we could provide a deserFactor in this case as well. Note, that when |
unfortunately the parquet probably in the first round we will target only the ORC format |
Got it. |
### What changes were proposed in this pull request? `org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details: ``` Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database ``` Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs. ### Why are the changes needed? Failing test doesn't give enough debug information. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? I've started the test manually and checked that such additional debug messages show up: ``` >>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000 >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType Looking for keys for: kafka/localhostEXAMPLE.COM Added key: 17version: 0 Added key: 23version: 0 Added key: 16version: 0 Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType Using builtin default etypes for permitted_enctypes default etypes for permitted_enctypes: 17 16 23. >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM MemoryCache: Existing AuthList: #3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM #2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM #1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM ``` Closes apache#26252 from gaborgsomogyi/SPARK-29580. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager. ### Why are the changes needed? Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same. ```py >>> func = lambda x: x >>> df = spark.range(1) >>> df.select(udf(func)("id")).cache() ``` ```py >>> df.select(udf(func)("id")).explain() == Physical Plan == *(2) Project [pythonUDF0#14 AS <lambda>(id)apache#12] +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14] +- *(1) Range (0, 1, step=1, splits=12) ``` This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance. ### Does this PR introduce _any_ user-facing change? Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it. ### How was this patch tested? I added a test case and manually. ```py >>> df.select(udf(func)("id")).explain() == Physical Plan == InMemoryTableScan [<lambda>(id)apache#12] +- InMemoryRelation [<lambda>(id)apache#12], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3] +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5] +- *(1) Range (0, 1, step=1, splits=12) ``` Closes apache#28774 from ueshin/issues/SPARK-31945/udf_cache. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… without WindowExpression ### What changes were proposed in this pull request? Add WindowFunction check at `CheckAnalysis`. ### Why are the changes needed? Provide friendly error msg. **BEFORE** ```scala scala> sql("select rank() from values(1)").show java.lang.UnsupportedOperationException: Cannot generate code for expression: rank() ``` **AFTER** ```scala scala> sql("select rank() from values(1)").show org.apache.spark.sql.AnalysisException: Window function rank() requires an OVER clause.;; Project [rank() AS RANK()#3] +- LocalRelation [col1#2] ``` ### Does this PR introduce _any_ user-facing change? Yes, user wiill be given a better error msg. ### How was this patch tested? Pass the newly added UT. Closes apache#28808 from ulysses-you/SPARK-31975. Authored-by: ulysses <youxiduo@weidian.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ly equivalent children in `RewriteDistinctAggregates` ### What changes were proposed in this pull request? In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same. ### Why are the changes needed? This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator. Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`. ``` create or replace temp view v1 as select * from values (1, 2, 3.0), (1, 3, 4.0), (2, 4, 2.5), (2, 3, 1.0) v1(a, b, c); select a, count(distinct b + 1), avg(distinct 1 + b) filter (where c > 0), sum(c) from v1 group by a; ``` The Expand operator has three projections (each producing a row for each incoming row): ``` [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation) [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for distinct aggregation of b + 1) [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b) ``` In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent. With the proposed change, the Expand operator's projections look like this: ``` [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular aggregations) [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct aggregation on b + 1 and 1 + b) ``` With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result. In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all. Benchmark code in the JIRA (SPARK-40382). Before the PR: ``` distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ all semantically equivalent 14721 14859 195 5.7 175.5 1.0X some semantically equivalent 14569 14572 5 5.8 173.7 1.0X none semantically equivalent 14408 14488 113 5.8 171.8 1.0X ``` After the PR: ``` distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ all semantically equivalent 3658 3692 49 22.9 43.6 1.0X some semantically equivalent 9124 9214 127 9.2 108.8 0.4X none semantically equivalent 14601 14777 250 5.7 174.1 0.3X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. Closes apache#37825 from bersprockets/rewritedistinct_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…edExpression() ### What changes were proposed in this pull request? In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`. ### Why are the changes needed? This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`. One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error). ### Does this PR introduce _any_ user-facing change? This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions. Example running the SQL command: ```sql select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2) ``` example error message before the fix: ``` java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3] ``` after the fix this error is gone. ### How was this patch tested? Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom. Closes apache#40473 from rednaxelafx/spark-42851. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
No description provided.