Skip to content

Commit

Permalink
Add ignore_missing_files Hive config (#8615)
Browse files Browse the repository at this point in the history
Summary:
Spark provides support for the data source configuration
`spark.sql.files.ignoreMissingFiles`, to ignore missing files while reading data
from files.
https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html#ignore-missing-files

Introduce Hive config ignore_missing_files to match this behavior.

Pull Request resolved: #8615

Reviewed By: xiaoxmeng

Differential Revision: D53335663

Pulled By: mbasmanova

fbshipit-source-id: 328e9abb20dba7a9b1c724e372967aa86b67c90f
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Feb 2, 2024
1 parent 2082743 commit 3c2b74a
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 15 deletions.
8 changes: 8 additions & 0 deletions velox/common/base/Exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ DECLARE_CHECK_FAIL_TEMPLATES(::facebook::velox::VeloxRuntimeError);
/* isRetriable */ false, \
##__VA_ARGS__)

#define VELOX_FILE_NOT_FOUND_ERROR(...) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kFileNotFound.c_str(), \
/* isRetriable */ false, \
##__VA_ARGS__)

#define VELOX_UNREACHABLE(...) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/VeloxException.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs;
// Errors indicating file read corruptions.
inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs;

// Errors indicating file not found.
inline constexpr auto kFileNotFound = "FILE_NOT_FOUND"_fs;

// We do not know how to classify it yet.
inline constexpr auto kUnknown = "UNKNOWN"_fs;
} // namespace error_code
Expand Down
22 changes: 22 additions & 0 deletions velox/common/base/tests/GTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
<< "Expected error message to contain '" << (_errorMessage) \
<< "', but received '" << status.message() << "'."

#define VELOX_ASSERT_ERROR_CODE_IMPL(_type, _expression, _errorCode) \
try { \
(_expression); \
FAIL() << "Expected an exception"; \
} catch (const _type& e) { \
ASSERT_TRUE(e.errorCode() == _errorCode) \
<< "Expected error code to be '" << _errorCode << "', but received '" \
<< e.errorCode() << "'."; \
}

#define VELOX_ASSERT_THROW_CODE(_expression, _errorCode) \
VELOX_ASSERT_ERROR_CODE_IMPL( \
facebook::velox::VeloxException, _expression, _errorCode)

#define VELOX_ASSERT_USER_THROW_CODE(_expression, _errorCode) \
VELOX_ASSERT_ERROR_CODE_IMPL( \
facebook::velox::VeloxUserError, _expression, _errorCode)

#define VELOX_ASSERT_RUNTIME_THROW_CODE(_expression, _errorCode) \
VELOX_ASSERT_ERROR_CODE_IMPL( \
facebook::velox::VeloxRuntimeError, _expression, _errorCode)

#ifndef NDEBUG
#define DEBUG_ONLY_TEST(test_fixture, test_name) TEST(test_fixture, test_name)
#define DEBUG_ONLY_TEST_F(test_fixture, test_name) \
Expand Down
18 changes: 11 additions & 7 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ uint64_t InMemoryWriteFile::size() const {

LocalReadFile::LocalReadFile(std::string_view path) : path_(path) {
fd_ = open(path_.c_str(), O_RDONLY);
VELOX_CHECK_GE(
fd_,
0,
"open failure in LocalReadFile constructor, {} {} {}.",
fd_,
path,
folly::errnoStr(errno));
if (fd_ < 0) {
if (errno == ENOENT) {
VELOX_FILE_NOT_FOUND_ERROR("No such file or directory: {}", path);
} else {
VELOX_FAIL(
"open failure in LocalReadFile constructor, {} {} {}.",
fd_,
path,
folly::errnoStr(errno));
}
}
const off_t rc = lseek(fd_, 0, SEEK_END);
VELOX_CHECK_GE(
rc,
Expand Down
10 changes: 10 additions & 0 deletions velox/common/file/tests/FileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <fcntl.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
Expand Down Expand Up @@ -316,3 +317,12 @@ TEST(LocalFile, rmdir) {
// deleted, which is not an error.
EXPECT_NO_THROW(localFs->rmdir(tempFolder->path));
}

TEST(LocalFile, fileNotFound) {
filesystems::registerLocalFileSystem();
auto tempFolder = ::exec::test::TempDirectoryPath::create();
auto path = fmt::format("{}/file", tempFolder->path);
auto localFs = filesystems::getFileSystem(path, nullptr);
VELOX_ASSERT_RUNTIME_THROW_CODE(
localFs->openFileForRead(path), error_code::kFileNotFound);
}
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ bool HiveConfig::isPartitionPathAsLowerCase(const Config* session) const {
return session->get<bool>(kPartitionPathAsLowerCaseSession, true);
}

bool HiveConfig::ignoreMissingFiles(const Config* session) const {
return session->get<bool>(kIgnoreMissingFilesSession, false);
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class HiveConfig {
static constexpr const char* kPartitionPathAsLowerCaseSession =
"partition_path_as_lower_case";

static constexpr const char* kIgnoreMissingFilesSession =
"ignore_missing_files";

/// The max coalesce bytes for a request.
static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes";

Expand Down Expand Up @@ -209,6 +212,8 @@ class HiveConfig {

bool isPartitionPathAsLowerCase(const Config* session) const;

bool ignoreMissingFiles(const Config* session) const;

int64_t maxCoalescedBytes() const;

int32_t maxCoalescedDistanceBytes() const;
Expand Down
15 changes: 14 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/connectors/hive/SplitReader.h"

#include "velox/common/caching/CacheTTLController.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/TableHandle.h"
Expand Down Expand Up @@ -91,7 +92,19 @@ void SplitReader::prepareSplit(
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

auto fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
std::shared_ptr<FileHandle> fileHandle;
try {
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
} catch (VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound.c_str() &&
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return;
} else {
throw;
}
}
// Here we keep adding new entries to CacheTTLController when new fileHandles
// are generated, if CacheTTLController was created. Creator of
// CacheTTLController needs to make sure a size control strategy was available
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ TEST(HiveConfigTest, overrideSession) {
{HiveConfig::kOrcWriterMaxDictionaryMemorySession, "22MB"},
{HiveConfig::kSortWriterMaxOutputRowsSession, "20"},
{HiveConfig::kSortWriterMaxOutputBytesSession, "20MB"},
{HiveConfig::kPartitionPathAsLowerCaseSession, "false"}};
{HiveConfig::kPartitionPathAsLowerCaseSession, "false"},
{HiveConfig::kIgnoreMissingFilesSession, "true"}};
const auto session = std::make_unique<MemConfig>(sessionOverride);
ASSERT_EQ(
hiveConfig->insertExistingPartitionsBehavior(session.get()),
Expand Down Expand Up @@ -174,4 +175,5 @@ TEST(HiveConfigTest, overrideSession) {
ASSERT_EQ(hiveConfig->sortWriterMaxOutputRows(session.get()), 20);
ASSERT_EQ(hiveConfig->sortWriterMaxOutputBytes(session.get()), 20UL << 20);
ASSERT_EQ(hiveConfig->isPartitionPathAsLowerCase(session.get()), false);
ASSERT_EQ(hiveConfig->ignoreMissingFiles(session.get()), true);
}
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ Each query can override the config by setting corresponding query session proper
- bool
- true
- If true, the partition directory will be converted to lowercase when executing a table write operation.
* - ignore_missing_files
-
- bool
- false
- If true, splits that refer to missing files don't generate errors and are processed as empty splits.
* - max-coalesced-bytes
-
- integer
Expand Down
19 changes: 13 additions & 6 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1388,12 +1388,19 @@ TEST_F(TableScanTest, splitOffsetAndLength) {
}

TEST_F(TableScanTest, fileNotFound) {
CursorParameters params;
params.planNode = tableScanNode();

auto cursor = TaskCursor::create(params);
cursor->task()->addSplit("0", makeHiveSplit("/path/to/nowhere.orc"));
EXPECT_THROW(cursor->moveNext(), VeloxRuntimeError);
auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto assertMissingFile = [&](bool ignoreMissingFiles) {
AssertQueryBuilder(tableScanNode())
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kIgnoreMissingFilesSession,
std::to_string(ignoreMissingFiles))
.split(split)
.assertEmptyResults();
};
assertMissingFile(true);
VELOX_ASSERT_RUNTIME_THROW_CODE(
assertMissingFile(false), error_code::kFileNotFound);
}

// A valid ORC file (containing headers) but no data.
Expand Down

0 comments on commit 3c2b74a

Please sign in to comment.