Skip to content

[SPARK-14259] [SQL] Merging small files together based on the cost of opening #12095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"max #files: $maxFileNumInPartition")
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
Expand All @@ -150,7 +150,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
currentSize += file.length
currentSize += file.length + openCostInBytes
currentFiles.append(file)
}

Expand All @@ -170,13 +170,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes ||
currentFiles.length >= maxFileNumInPartition) {
if (currentSize + file.length > maxSplitBytes) {
closePartition()
addFile(file)
} else {
addFile(file)
}
addFile(file)
}
closePartition()
partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,13 @@ object SQLConf {
doc = "The maximum number of bytes to pack into a single partition when reading files.",
isPublic = true)

val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
defaultValue = Some(32),
doc = "The maximum number of files to pack into a single partition when reading files.",
isPublic = true)
val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
defaultValue = Some(4 * 1024 * 1024),
doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimated, then the partitions with small files will be faster than partitions with" +
" bigger files (which is scheduled first).",
isPublic = false)

val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
defaultValue = Some(true),
Expand Down Expand Up @@ -572,7 +575,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)

def useCompression: Boolean = getConf(COMPRESS_CACHED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
"file2" -> 5,
"file3" -> 5))

withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// 5 byte files should be laid out [(5, 5), (5)]
assert(partitions.size == 2, "when checking partitions")
Expand All @@ -98,11 +99,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
createTable(
files = Seq(
"file1" -> 15,
"file2" -> 4))
"file2" -> 3))

withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(0-5), (5-10, 4)]
// Files should be laid out [(0-10), (10-15, 4)]
assert(partitions.size == 2, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
Expand Down Expand Up @@ -132,8 +134,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
"file5" -> 1,
"file6" -> 1))

withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")
Expand Down