Skip to content

[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size #20072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

CodingCat
Copy link
Contributor

What changes were proposed in this pull request?

as per discussion in #19864 (comment)

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in #19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

How was this patch tested?

Existing tests

@SparkQA
Copy link

SparkQA commented Dec 25, 2017

Test build #85363 has finished for PR 20072 at commit e6065c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

" the in-disk and in-memory size of data is significantly different, users can adjust this" +
" factor for a better choice of the execution plan. The default value is 1.0.")
.doubleConf
.createWithDefault(1.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkValues > 0.0

@@ -82,7 +84,15 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val size = location.sizeInBytes * hadoopFSSizeFactor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be handled by size > Long.MaxValue, the double value is overflowed only when the result value is Double.PositivyInfinity which would be capped as Long.MaxValue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. hadoopFSSizeFactor is a double

@CodingCat
Copy link
Contributor Author

@gatorsmile thanks for the review, Happy Christmas!

@SparkQA
Copy link

SparkQA commented Dec 25, 2017

Test build #85383 has finished for PR 20072 at commit ec275a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

@gatorsmile more comments?

.internal()
.doc("The result of multiplying this factor with the size of data source files is propagated" +
" to serve as the stats to choose the best execution plan. In the case where the " +
" the in-disk and in-memory size of data is significantly different, users can adjust this" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: always put space at the end of the line for readability / consistency.
we have two spaces here

"...In the case where the " +
" the in-disk and in-..."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and double the

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

@SparkQA
Copy link

SparkQA commented Dec 30, 2017

Test build #85548 has finished for PR 20072 at commit 2a33b88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

@@ -261,6 +261,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HADOOPFSRELATION_SIZE_FACTOR = buildConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about DISK_TO_MEMORY_SIZE_FACTOR? IMHO the current name doesn't describe the purpose clearly.

@@ -261,6 +261,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HADOOPFSRELATION_SIZE_FACTOR = buildConf(
"org.apache.spark.sql.execution.datasources.sizeFactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config for all data sources or only hadoopFS-related data sources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only for HadoopFSRelation

@@ -60,6 +60,8 @@ case class HadoopFsRelation(
}
}

private val hadoopFSSizeFactor = sqlContext.conf.hadoopFSSizeFactor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move it into the method sizeInBytes since it's only used there?

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val size = location.sizeInBytes * hadoopFSSizeFactor
if (size > Long.MaxValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch can be removed? Long.MaxValue is returned when converting a double value larger than Long.MaxValue.

@CodingCat
Copy link
Contributor Author

@wzhfy thanks for the review, please take a look

@@ -261,6 +261,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
"org.apache.spark.sql.execution.datasources.sizeFactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...sizeFactor is too vague, how about fileDataSizeFactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val sizeFactor = sqlContext.conf.sizeToMemorySizeFactor
(location.sizeInBytes * sizeFactor).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a safe check for overflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before the latest commit, there is a safe check e6065c7#diff-fcb68cd3c7630f337ce9a3b479b6d0c4R88

However, since sizeFactor is a double, any overflow with positive double numbers would be capped as Double.PositiveInfinity, and as @wzhfy indicated, any double number which is larger than Long.MaxValue would return Long.MaxValue in its toLong method

so it should be safe here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good to know it

@SparkQA
Copy link

SparkQA commented Jan 2, 2018

Test build #85586 has finished for PR 20072 at commit e97f419.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -261,6 +261,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
"org.apache.spark.sql.execution.datasources.fileDataSizeFactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to spark.sql.sources.parallelPartitionDiscovery.parallelism, how about spark.sql.sources. fileDataSizeFactor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we call this something like compressionFactor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah compressionFactor sounds better.

@cloud-fan
Copy link
Contributor

LGTM, we should also add a test

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85638 has finished for PR 20072 at commit a0f3462.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

@cloud-fan @rxin @wzhfy @felixcheung @gatorsmile thanks the review, the new name of the parameter and test are added

@SparkQA
Copy link

SparkQA commented Jan 6, 2018

Test build #85757 has finished for PR 20072 at commit 2f6e3c9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2018

Test build #85760 has finished for PR 20072 at commit 291ce3a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 6, 2018

Test build #85758 has finished for PR 20072 at commit 670a6c0.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2018

Test build #85762 has finished for PR 20072 at commit 291ce3a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"in-disk and in-memory size of data is significantly different, users can adjust this " +
"factor for a better choice of the execution plan. The default value is 1.0.")
.doubleConf
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe >= 1.0? it's weird to see a compression factor less than 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW fileDataSizeFactor -> compressionFactor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not necessary to be that parquet is always smaller than memory size...e.g. in some simple dataset (like the one used in the test), parquet's overhead makes the overall size larger than in-memory size....

but with TPCDS dataset, I observed that parquet size is much smaller than in-memory size

@@ -263,6 +263,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
"spark.sql.sources.compressionFactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge this with the previous line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how about fileCompressionFactor? Since it works for only file-based data sources.

@@ -263,6 +263,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this too, FILE_COMRESSION_FACTOR

"spark.sql.sources.compressionFactor")
.internal()
.doc("The result of multiplying this factor with the size of data source files is propagated " +
"to serve as the stats to choose the best execution plan. In the case where the " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When estimating the output data size of a table scan, multiple the file size with this factor as the estimated data size, in case the data is compressed in the file and lead to a heavily underestimated result.

@@ -82,7 +82,11 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val sizeFactor = sqlContext.conf.diskToMemorySizeFactor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionFactor

@gatorsmile
Copy link
Member

cc @CodingCat

@SparkQA
Copy link

SparkQA commented Jan 11, 2018

Test build #85949 has finished for PR 20072 at commit 5230081.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2018

Test build #85970 has finished for PR 20072 at commit 6fe8589.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #85988 has finished for PR 20072 at commit c584c61.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86045 has finished for PR 20072 at commit c584c61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master/2.3

asfgit pushed a commit that referenced this pull request Jan 13, 2018
…tion's size

## What changes were proposed in this pull request?

as per discussion in #19864 (comment)

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in #19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes #20072 from CodingCat/SPARK-22790.

(cherry picked from commit ba891ec)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@asfgit asfgit closed this in ba891ec Jan 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants