Skip to content

Spark 24914 working #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed

Spark 24914 working #3

wants to merge 7 commits into from

Conversation

attilapiros
Copy link
Owner

No description provided.

Copy link

@bersprockets bersprockets left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a little while to get my reproduction case (inappropriate broadcast that results in OOM) working with Spark 3.0. So I got started a little late.

Anyway, I've started playing around and reviewing, and I will continue tomorrow.

For my parquet test case, I noticed that the deserFactor seemed rather low (it was 2, when I expected more like 10). For the orc case, it seemed spot on (751).

This is my reprod case for parquet:

// ./bin/spark-shell --driver-memory 12g
// create data
import scala.util.Random
val rand = new Random
val df = Seq.tabulate(5000000) { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e")
df.createOrReplaceTempView("df")
sql("drop table if exists fred")
sql("create table fred stored as parquet as select * from df")

val df = spark.range(0, 2000000).map(x => (x, x + 1, x + 2, x + 3)).toDF("a", "b", "c", "d")
df.createOrReplaceTempView("df")
sql("drop table if exists phil")
sql("create table phil stored as parquet as select * from df")


// ./bin/spark-shell --master "local-cluster[6, 1, 4096]" --driver-memory 1g
// run test
sql("select f.a, f.b, f.c, f.d, p.c from fred f, phil p where f.b = p.a").show

This is my reprod case with ORC. In this case, I had a smaller TextFile table (but whose files are bigger), and a bigger ORC table(but whose files are smaller)

// ./bin/spark-shell --driver-memory 12g
// create data
val df = spark.range(0, 5000000).map { x => (x, x + 1, x + 2, x + 3)}.toDF("a", "b", "c", "d")
df.createOrReplaceTempView("df")
sql("drop table if exists fredorc")
sql("create table fredorc stored as orc as select * from df")

val df = spark.range(0, 2000000).map { x => (x, x + 1, x + 2, x + 3)}.toDF("a", "b", "c", "d")
df.createOrReplaceTempView("df")
sql("drop table if exists philtxt")
sql("create table philtxt stored as textfile as select * from df")


// ./bin/spark-shell --master "local-cluster[6, 1, 4096]" --driver-memory 1g
// run test
sql("select f.a, f.b, f.c, f.d, p.c from fredorc f, philtxt p where f.b = p.a").show

val newTotalSize = CommandUtils.calculateLocationSize(
sessionState, tableMeta.identifier, p.storage.locationUri)
val totalSizeWithDeserFact = CommandUtils.calculateLocationSize(
sparkSession.sessionState,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the result is not just total size (but I can revert this)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably meant the indentation so I have reverted that back

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant sparkSession.sessionState vs sessionState.

if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
Statistics(
sizeInBytes = applyDeserFactor(size, deserFactorDistortion),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to apply deserFactor to size when CBO is enabled, as I think that code already tries to calculate the size based on expected column size and number of rows (not based on raw file size). You might want to check on how EstimationUtils.getOutputSize calculates size and then see if you really needs this here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Great finding! :)

val DESERIALIZATION_FACTOR_CALC_ENABLED =
buildConf("spark.sql.statistics.deserFactor.calc.enabled")
.doc("Enables the calculation of the deserialization factor as a table statistic. " +
"This factor is calculated for columnar storage formats by dividing the raw byte size " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to touch up the description a little bit, but I can't think of anything good at this time of night :).

For this one piece, maybe "This factor is calculated for columnar storage formats as a ratio of actual data size to raw file size".

.doc("Enables the calculation of the deserialization factor as a table statistic. " +
"This factor is calculated for columnar storage formats by dividing the raw byte size " +
"of uncompressed data with the file size. It is used for scaling up size in bytes " +
"statistic which leads to better a estimate of in-memory data size which effects " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud:
"Spark uses this ratio to scale up the estimated size, which leads to better estimate of in-memory data size and improves the query optimization (i.e., join strategy). Spark stores a ratio, rather than the data size, so that the table can grow without having to recompute statistics"

"In case of partitioned table the maximum of these factors is taken. " +
"When the factor is already calculated (and stored in the meta store) but the " +
"calculation is disabled in a subsequent ANALYZE TABLE (by setting this config to false) " +
"then the old factor will be applied as this factor can be removed only by TRUNCATE or a " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing you could do is ignore the deserFactor when calculating statistics at query time if this config setting is not true. Maybe that makes the code messy (having to check SQLConf in inconvenient places).

@attilapiros
Copy link
Owner Author

attilapiros commented Sep 26, 2019

@bersprockets Thanks for the help!

Regarding parquet I have generated the table at my place and iIt is 8 files:

$ stat -f "%N %Z" *.parquet
part-00000-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4004493
part-00001-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003644
part-00002-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003780
part-00003-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003712
part-00004-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003696
part-00005-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003772
part-00006-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003744
part-00007-633caa1d-0199-419f-b623-1ecac7d62328-c000.snappy.parquet 4003704

Now if you take the number of rows 2000000 based on:

val df = spark.range(0, 2000000).map(x => (x, x + 1, x + 2, x + 3)).toDF("a", "b", "c", "d")

And multiply it by num columns (4) and with the mem size used to store one bigint 8 bytes then divide it by the number of files you got: 8 000 000 bytes.
Which is bit under the double of a file size, like 4 003 704.

So deserFact=2 seams to me the right value.

@bersprockets
Copy link

val df = spark.range(0, 2000000).map(x => (x, x + 1, x + 2, x + 3)).toDF("a", "b", "c", "d")

I was thinking more of this case, where there is a lot of repeating data:

import scala.util.Random
val rand = new Random
val df = Seq.tabulate(5000000) { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e")

In this case, the ratio is more like 10, but the deserFactor is calculated as 2.

val catalogTable2 = getCatalogTable(table)
assert(catalogTable2.stats.isDefined &&
catalogTable2.stats.get.deserFactor.isDefined)
assert(catalogTable2.stats.get.deserFactor.get > deserFactorBefore,
Copy link

@bersprockets bersprockets Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test implies that size of data impacts the deserFactor (ratio), when in reality (I believe) it's the make-up of the data: Whether data is incrementing sequentially, or is repeating, or is random. Is that not the case?

sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
checkDeserializationFactor(table, exists = false)
val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key")
checkNumBroadcastHashJoins(res, 1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could instead just check that sizeInBytes == origSizeInBytes.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this assert as it is follows the same pattern used at previous cases, too

: Unit = test(s"SPARK-24914 - test deserialization factor ($fileformat)") {
val table = s"sizeTest"

withTable(table) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how important this is for the tests, but my memory from an escalation is that Spark calculates sizeInBytes differently depending on whether the table is partitioned or not.

Here's my recollection (for ORC, anyway):

  • Not partitioned: Spark uses totalSize
  • Partitioned and spark.sql.hive.convertMetastoreOrc=true: Spark prunes the partitions and sums the size of the files in the relevant partitions.
  • Partitioned and spark.sql.hive.convertMetastoreOrc=false: : Spark uses totalSize

@bersprockets
Copy link

bersprockets commented Sep 27, 2019

I was trying to figure out a few things, since my memory has faded. Here are my notes as I looked at some code (in the context of parquet tables, but I assume the same applies for ORC):

Non-partitioned Hive table with statistics

Spark gets the statistics from totalSize in the Hive table.

When the user inserts new rows into the table, Spark updates totalSize in Hive (at least it does on my laptop). I assume insert operations from Hive will also update totalSize (although I have not tested that).

However, if you do something whacky like the following, the statistics get out-of-date (as you would expect):

import scala.util.Random
val rand = new Random
val df = spark.range(0, 5000000).map { x => ("hello there", 20, 20, 20, rand.nextLong % 20000) }.toDF("a", "b", "c", "d", "e")
df.write.mode("append").format("parquet").save("spark-warehouse/desertest")

Non-partitioned Hive table with no statistics

In this case, Spark gets the sizeInBytes from the HadoopFsRelation (relation.sizeInBytes) in the LogicalRelation, which in turn gets the table size from location (a FileIndex, or more specifically, InMemoryFileIndex).

Note that HadoopFsRelation.sizeInBytes will return location.sizeInBytes multiplied by the setting for spark.sql.sources.fileCompressionFactor (default 1.0). spark.sql.sources.fileCompressionFactor is similar in concept to our deserFactor, except it is across the board for all tables that have no statistics.

So it seems that when a Hive table has no statistics, Spark gets sizeInBytes from the sum of the current file sizes times the fileCompressionFactor. I guess in this case Spark will always be up-to-date with the file sizes. That is, when files are dumped into the tables directory (e.g., my example above), sizeInBytes will reflect the change.

If we had an external tool to calculate deserFactor, or an option on ANALYZE TABLE, we could provide a deserFactor in this case as well.

Note, that when spark.sql.hive.convertMetastoreParquet=false and the table has no statistics, Spark just sets sizeInBytes to a giant default size (9223372036854775807).

@attilapiros
Copy link
Owner Author

unfortunately the parquet getTotalUncompressedSize does not reflect the size of the data in mem: like for PLAIN_DICTIONARY encoding it is really just a counter and the value size not the multiplication of those two...

probably in the first round we will target only the ORC format

@bersprockets
Copy link

probably in the first round we will target only the ORC format

Got it.

@attilapiros attilapiros closed this Oct 4, 2019
attilapiros pushed a commit that referenced this pull request Nov 13, 2019
### What changes were proposed in this pull request?
`org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details:
```
Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database
```
Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs.

### Why are the changes needed?
Failing test doesn't give enough debug information.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
I've started the test manually and checked that such additional debug messages show up:
```
>>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Looking for keys for: kafka/localhostEXAMPLE.COM
Added key: 17version: 0
Added key: 23version: 0
Added key: 16version: 0
Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Using builtin default etypes for permitted_enctypes
default etypes for permitted_enctypes: 17 16 23.
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM
MemoryCache: Existing AuthList:
#3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM
#2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM
#1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM
```

Closes apache#26252 from gaborgsomogyi/SPARK-29580.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
attilapiros pushed a commit that referenced this pull request Jun 10, 2020
### What changes were proposed in this pull request?

This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager.

### Why are the changes needed?

Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same.

```py
>>> func = lambda x: x

>>> df = spark.range(1)
>>> df.select(udf(func)("id")).cache()
```
```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#14 AS <lambda>(id)apache#12]
+- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14]
 +- *(1) Range (0, 1, step=1, splits=12)
```

This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance.

### Does this PR introduce _any_ user-facing change?

Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it.

### How was this patch tested?

I added a test case and manually.

```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
InMemoryTableScan [<lambda>(id)apache#12]
   +- InMemoryRelation [<lambda>(id)apache#12], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3]
            +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5]
               +- *(1) Range (0, 1, step=1, splits=12)
```

Closes apache#28774 from ueshin/issues/SPARK-31945/udf_cache.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
attilapiros pushed a commit that referenced this pull request Jul 7, 2020
… without WindowExpression

### What changes were proposed in this pull request?

Add WindowFunction check at `CheckAnalysis`.

### Why are the changes needed?
Provide friendly error msg.

**BEFORE**
```scala
scala> sql("select rank() from values(1)").show
java.lang.UnsupportedOperationException: Cannot generate code for expression: rank()
```

**AFTER**
```scala
scala> sql("select rank() from values(1)").show
org.apache.spark.sql.AnalysisException: Window function rank() requires an OVER clause.;;
Project [rank() AS RANK()#3]
+- LocalRelation [col1#2]
```

### Does this PR introduce _any_ user-facing change?

Yes, user wiill be given a better error msg.

### How was this patch tested?

Pass the newly added UT.

Closes apache#28808 from ulysses-you/SPARK-31975.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
attilapiros pushed a commit that referenced this pull request Nov 4, 2022
…ly equivalent children in `RewriteDistinctAggregates`

### What changes were proposed in this pull request?

In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same.

### Why are the changes needed?

This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator.

Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`.

```
create or replace temp view v1 as
select * from values
(1, 2, 3.0),
(1, 3, 4.0),
(2, 4, 2.5),
(2, 3, 1.0)
v1(a, b, c);

select
  a,
  count(distinct b + 1),
  avg(distinct 1 + b) filter (where c > 0),
  sum(c)
from
  v1
group by a;
```
The Expand operator has three projections (each producing a row for each incoming row):
```
[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
[a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
[a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
```
In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:
```
[a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
[a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
```
With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result.

In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

Benchmark code in the JIRA (SPARK-40382).

Before the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
```
After the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes apache#37825 from bersprockets/rewritedistinct_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
attilapiros pushed a commit that referenced this pull request May 11, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants