Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gluten-4585][VL] Support spark.sql.files.ignoreMissingFiles=true #4725

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const std::string kLegacySize = "spark.sql.legacy.sizeOfNull";

const std::string kSessionTimezone = "spark.sql.session.timeZone";

const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles";

const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default";

const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes";
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,8 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6";
configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] =
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));

configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
return std::make_shared<velox::core::MemConfig>(configs);
}

Expand Down
2 changes: 1 addition & 1 deletion docs/velox-backend-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ In certain cases, Gluten result may be different from Vanilla spark.
Velox only supports double quotes surrounded strings, not single quotes, in JSON data. If single quotes are used, gluten will produce incorrect result.

#### Parquet read conf
Gluten supports `spark.files.ignoreCorruptFiles` and `spark.files.ignoreMissingFiles` with default false, if true, the behavior is same as config false.
Gluten supports `spark.files.ignoreCorruptFiles` with default false, if true, the behavior is same as config false.
Gluten ignores `spark.sql.parquet.datetimeRebaseModeInRead`, it only returns what write in parquet file. It does not consider the difference between legacy
hybrid (Julian Gregorian) calendar and Proleptic Gregorian calendar. The result may be different with vanilla spark.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// ignoreMissingFiles mode: error msg from velox is different, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -173,4 +176,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// ignoreMissingFiles mode: error msg from velox is different, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -174,4 +177,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// error msg from velox is different & reader options is not supported, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -174,4 +177,71 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox and data Source reader options
// is not supported.
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,8 @@ object GlutenConfig {
})

val keyWithDefault = ImmutableList.of(
(SQLConf.CASE_SENSITIVE.key, "false")
(SQLConf.CASE_SENSITIVE.key, "false"),
(SQLConf.IGNORE_MISSING_FILES.key, "false")
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down
Loading