Skip to content

[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size #20072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object SQLConf {
val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will infer and propagate data constraints in the query " +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
"for certain kinds of query plans (such as those with a large number of predicates and " +
"aliases) which might negatively impact overall runtime.")
.booleanConf
Expand All @@ -263,6 +263,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val FILE_COMRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
.internal()
.doc("When estimating the output data size of a table scan, multiply the file size with this " +
"factor as the estimated data size, in case the data is compressed in the file and lead to" +
" a heavily underestimated result.")
.doubleConf
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
.createWithDefault(1.0)

val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
Expand Down Expand Up @@ -1241,6 +1250,8 @@ class SQLConf extends Serializable with Logging {

def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)

def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)

def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}


override def inputFiles: Array[String] = location.inputFiles
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.{File, FilenameFilter}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.test.SharedSQLContext

class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
Expand All @@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
}
}

test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
withTempPath { workDir =>
// the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath
val data1 = Seq(100, 200, 300, 400).toDF("count")
data1.write.parquet(workDirPath + "/data1")
val df1FromFile = spark.read.parquet(workDirPath + "/data1")
val data2 = Seq(100, 200, 300, 400).toDF("count")
data2.write.parquet(workDirPath + "/data2")
val df2FromFile = spark.read.parquet(workDirPath + "/data2")
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
}
}
}
}
}