-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size #20072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #85363 has finished for PR 20072 at commit
|
" the in-disk and in-memory size of data is significantly different, users can adjust this" + | ||
" factor for a better choice of the execution plan. The default value is 1.0.") | ||
.doubleConf | ||
.createWithDefault(1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkValues > 0.0
@@ -82,7 +84,15 @@ case class HadoopFsRelation( | |||
} | |||
} | |||
|
|||
override def sizeInBytes: Long = location.sizeInBytes | |||
override def sizeInBytes: Long = { | |||
val size = location.sizeInBytes * hadoopFSSizeFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be handled by size
> Long.MaxValue, the double value is overflowed only when the result value is Double.PositivyInfinity which would be capped as Long.MaxValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. hadoopFSSizeFactor is a double
@gatorsmile thanks for the review, Happy Christmas! |
Test build #85383 has finished for PR 20072 at commit
|
@gatorsmile more comments? |
.internal() | ||
.doc("The result of multiplying this factor with the size of data source files is propagated" + | ||
" to serve as the stats to choose the best execution plan. In the case where the " + | ||
" the in-disk and in-memory size of data is significantly different, users can adjust this" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: always put space at the end of the line for readability / consistency.
we have two spaces here
"...In the case where the " +
" the in-disk and in-..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and double the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually https://github.com/apache/spark/pull/20072/files/ec275a841a7bb4c23b277f915debeed54e6cf7ea#diff-9a6b543db706f1a90f790783d6930a13R250 is missing a space at the end - could you also fix that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks
Test build #85548 has finished for PR 20072 at commit
|
@@ -261,6 +261,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val HADOOPFSRELATION_SIZE_FACTOR = buildConf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about DISK_TO_MEMORY_SIZE_FACTOR
? IMHO the current name doesn't describe the purpose clearly.
@@ -261,6 +261,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val HADOOPFSRELATION_SIZE_FACTOR = buildConf( | |||
"org.apache.spark.sql.execution.datasources.sizeFactor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this config for all data sources or only hadoopFS-related data sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only for HadoopFSRelation
@@ -60,6 +60,8 @@ case class HadoopFsRelation( | |||
} | |||
} | |||
|
|||
private val hadoopFSSizeFactor = sqlContext.conf.hadoopFSSizeFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move it into the method sizeInBytes
since it's only used there?
override def sizeInBytes: Long = location.sizeInBytes | ||
override def sizeInBytes: Long = { | ||
val size = location.sizeInBytes * hadoopFSSizeFactor | ||
if (size > Long.MaxValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this branch can be removed? Long.MaxValue
is returned when converting a double value larger than Long.MaxValue
.
@wzhfy thanks for the review, please take a look |
@@ -261,6 +261,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( | |||
"org.apache.spark.sql.execution.datasources.sizeFactor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...sizeFactor
is too vague, how about fileDataSizeFactor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
override def sizeInBytes: Long = location.sizeInBytes | ||
override def sizeInBytes: Long = { | ||
val sizeFactor = sqlContext.conf.sizeToMemorySizeFactor | ||
(location.sizeInBytes * sizeFactor).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a safe check for overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before the latest commit, there is a safe check e6065c7#diff-fcb68cd3c7630f337ce9a3b479b6d0c4R88
However, since sizeFactor is a double, any overflow with positive double numbers would be capped as Double.PositiveInfinity, and as @wzhfy indicated, any double number which is larger than Long.MaxValue would return Long.MaxValue in its toLong method
so it should be safe here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good to know it
Test build #85586 has finished for PR 20072 at commit
|
@@ -261,6 +261,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( | |||
"org.apache.spark.sql.execution.datasources.fileDataSizeFactor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to spark.sql.sources.parallelPartitionDiscovery.parallelism
, how about spark.sql.sources. fileDataSizeFactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we call this something like compressionFactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah compressionFactor
sounds better.
LGTM, we should also add a test |
Test build #85638 has finished for PR 20072 at commit
|
@cloud-fan @rxin @wzhfy @felixcheung @gatorsmile thanks the review, the new name of the parameter and test are added |
Test build #85757 has finished for PR 20072 at commit
|
Test build #85760 has finished for PR 20072 at commit
|
retest this please |
Test build #85758 has finished for PR 20072 at commit
|
Test build #85762 has finished for PR 20072 at commit
|
"in-disk and in-memory size of data is significantly different, users can adjust this " + | ||
"factor for a better choice of the execution plan. The default value is 1.0.") | ||
.doubleConf | ||
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe >= 1.0
? it's weird to see a compression factor less than 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW fileDataSizeFactor
-> compressionFactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not necessary to be that parquet is always smaller than memory size...e.g. in some simple dataset (like the one used in the test), parquet's overhead makes the overall size larger than in-memory size....
but with TPCDS dataset, I observed that parquet size is much smaller than in-memory size
@@ -263,6 +263,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( | |||
"spark.sql.sources.compressionFactor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge this with the previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, how about fileCompressionFactor
? Since it works for only file-based data sources.
@@ -263,6 +263,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this too, FILE_COMRESSION_FACTOR
"spark.sql.sources.compressionFactor") | ||
.internal() | ||
.doc("The result of multiplying this factor with the size of data source files is propagated " + | ||
"to serve as the stats to choose the best execution plan. In the case where the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When estimating the output data size of a table scan, multiple the file size with this factor as the estimated data size, in case the data is compressed in the file and lead to a heavily underestimated result.
@@ -82,7 +82,11 @@ case class HadoopFsRelation( | |||
} | |||
} | |||
|
|||
override def sizeInBytes: Long = location.sizeInBytes | |||
override def sizeInBytes: Long = { | |||
val sizeFactor = sqlContext.conf.diskToMemorySizeFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compressionFactor
cc @CodingCat |
Test build #85949 has finished for PR 20072 at commit
|
Test build #85970 has finished for PR 20072 at commit
|
Test build #85988 has finished for PR 20072 at commit
|
retest this please |
Test build #86045 has finished for PR 20072 at commit
|
Thanks! Merged to master/2.3 |
…tion's size ## What changes were proposed in this pull request? as per discussion in #19864 (comment) the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM Users can enable CBO with the functionalities in #19864 to avoid this issue This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO ## How was this patch tested? Existing tests Author: CodingCat <zhunansjtu@gmail.com> Author: Nan Zhu <nanzhu@uber.com> Closes #20072 from CodingCat/SPARK-22790. (cherry picked from commit ba891ec) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
as per discussion in #19864 (comment)
the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM
Users can enable CBO with the functionalities in #19864 to avoid this issue
This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO
How was this patch tested?
Existing tests