Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ HiveConfig::insertExistingPartitionsBehavior(const Config* config) {
: InsertExistingPartitionsBehavior::kError;
}

bool HiveConfig::isCaseSensitive(const Config* config) {
return config->get<bool>(kCaseSensitive, true);
}

} // namespace facebook::velox::connector::hive
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class HiveConfig {

static InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* config);

static constexpr const char* kCaseSensitive = "case_sensitive";

static bool isCaseSensitive(const Config* config);
};

} // namespace facebook::velox::connector::hive
14 changes: 8 additions & 6 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ HiveDataSource::HiveDataSource(
ExpressionEvaluator* expressionEvaluator,
memory::MemoryAllocator* allocator,
const std::string& scanId,
bool caseSensitive,
folly::Executor* executor)
: outputType_(outputType),
fileHandleFactory_(fileHandleFactory),
Expand Down Expand Up @@ -196,6 +197,8 @@ HiveDataSource::HiveDataSource(
readerOutputType_ = ROW(std::move(names), std::move(types));
}

readerOpts_.setCaseSensitive(caseSensitive);

rowReaderOpts_.setScanSpec(scanSpec_);
rowReaderOpts_.setMetadataFilter(metadataFilter_);

Expand Down Expand Up @@ -257,7 +260,8 @@ template <TypeKind ToKind>
velox::variant convertFromString(const std::optional<std::string>& value) {
if (value.has_value()) {
// No need for casting if ToKind is VARCHAR or VARBINARY.
if constexpr (ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
if constexpr (
ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
return velox::variant(value.value());
}
bool nullOutput = false;
Expand Down Expand Up @@ -385,10 +389,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
scanSpec_->resetCachedValues();

// Check filters and see if the whole split can be skipped
if (!testFilters(
scanSpec_.get(),
reader_.get(),
split_->filePath)) {
if (!testFilters(scanSpec_.get(), reader_.get(), split_->filePath)) {
emptySplit_ = true;
++runtimeStats_.skippedSplits;
runtimeStats_.skippedSplitBytes += split_->length;
Expand All @@ -407,7 +408,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
static const RowTypePtr kEmpty{ROW({}, {})};
cs = std::make_shared<dwio::common::ColumnSelector>(kEmpty);
} else {
cs = std::make_shared<dwio::common::ColumnSelector>(fileType, columnNames);
cs = std::make_shared<dwio::common::ColumnSelector>(
fileType, columnNames, nullptr, readerOpts_.isCaseSensitive());
}

rowReader_ = reader_->createRowReader(
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/expression/Expr.h"
#include "velox/type/Filter.h"
#include "velox/type/Subfield.h"
#include "velox/connectors/hive/HiveConfig.h"

namespace facebook::velox::connector::hive {

Expand Down Expand Up @@ -112,6 +113,7 @@ class HiveDataSource : public DataSource {
ExpressionEvaluator* FOLLY_NONNULL expressionEvaluator,
memory::MemoryAllocator* FOLLY_NONNULL allocator,
const std::string& scanId,
bool caseSensitive,
folly::Executor* FOLLY_NULLABLE executor);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;
Expand Down Expand Up @@ -221,6 +223,7 @@ class HiveConnector final : public Connector {
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->allocator(),
connectorQueryCtx->scanId(),
HiveConfig::isCaseSensitive(connectorQueryCtx->config()),
executor_);
}

Expand Down
52 changes: 42 additions & 10 deletions velox/dwio/common/ColumnSelector.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,21 @@ class ColumnSelector {
*/
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)}, schema_{schema}, state_{ReadState::kAll} {
buildNodes(schema, contentSchema);

// no filter, read everything
setReadAll();
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -77,18 +80,21 @@ class ColumnSelector {
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<std::string>& names,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, names, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, names, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<std::string>& names,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{names.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, names);
acceptFilter(schema, contentSchema, names, false);
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -98,19 +104,23 @@ class ColumnSelector {
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, ids, filterByNodes, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, ids, filterByNodes, log, caseSensitive) {
}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{ids.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, ids, filterByNodes);
checkSelectColDuplicate(caseSensitive);
}

// set a specific node to read state
Expand Down Expand Up @@ -301,6 +311,28 @@ class ColumnSelector {
// get node ID list to be read
std::vector<uint64_t> getNodeFilter() const;

void checkSelectColDuplicate(bool caseSensitive) {
if (caseSensitive) {
return;
}
std::unordered_map<std::string, int> names;
for (auto node : nodes_) {
auto name = node->getNode().name;
if (names.find(name) == names.end()) {
names[name] = 1;
} else {
names[name] = names[name] + 1;
}
for (auto filter : filter_) {
if (names[filter.name] > 1) {
VELOX_USER_FAIL(
"Found duplicate field(s) {} in case-insensitive mode",
filter.name);
}
}
}
}

// accept filter
template <typename T>
void acceptFilter(
Expand Down
13 changes: 12 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class ReaderOptions {
int32_t maxCoalesceDistance_{kDefaultCoalesceDistance};
SerDeOptions serDeOptions;
std::shared_ptr<encryption::DecrypterFactory> decrypterFactory_;
bool caseSensitive;

public:
static constexpr int32_t kDefaultLoadQuantum = 8 << 20; // 8MB
Expand All @@ -357,7 +358,8 @@ class ReaderOptions {
fileFormat(FileFormat::UNKNOWN),
fileSchema(nullptr),
autoPreloadLength(DEFAULT_AUTO_PRELOAD_SIZE),
prefetchMode(PrefetchMode::PREFETCH) {
prefetchMode(PrefetchMode::PREFETCH),
caseSensitive(true) {
// PASS
}

Expand Down Expand Up @@ -467,6 +469,11 @@ class ReaderOptions {
return *this;
}

ReaderOptions& setCaseSensitive(bool caseSensitiveMode) {
caseSensitive = caseSensitiveMode;
return *this;
}

/**
* Get the desired tail location.
* @return if not set, return the maximum long.
Expand Down Expand Up @@ -524,6 +531,10 @@ class ReaderOptions {
const {
return decrypterFactory_;
}

const bool isCaseSensitive() const {
return caseSensitive;
}
};

} // namespace common
Expand Down
20 changes: 20 additions & 0 deletions velox/dwio/common/tests/TestColumnSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/type/fbhive/HiveTypeParser.h"
#include "velox/type/Type.h"
#include "velox/common/base/VeloxException.h"

using namespace facebook::velox::dwio::common;
using facebook::velox::RowType;
Expand Down Expand Up @@ -630,3 +631,22 @@ TEST(TestColumnSelector, testNonexistingColFilters) {
std::vector<std::string>{"id", "values", "notexists#[10,20,30,40]"}),
std::runtime_error);
}

TEST(TestColumnSelector, testCaseInsensitiveDuplicateColFilters) {
const auto schema = std::dynamic_pointer_cast<const RowType>(
HiveTypeParser().parse("struct<"
"id:bigint"
"id:bigint"
"values:array<float>"
"tags:map<int, string>"
"notes:struct<f1:int, f2:double, f3:string>"
"memo:string"
"extra:string>"));

EXPECT_THROW(
ColumnSelector cs(
schema,
std::vector<std::string>{"id"}, nullptr, false),
facebook::velox::VeloxException);
}

18 changes: 12 additions & 6 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"

#include <folly/String.h>

namespace facebook::velox::parquet {

ReaderBase::ReaderBase(
Expand Down Expand Up @@ -110,7 +112,7 @@ void ReaderBase::initializeSchema() {
uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1;
schemaWithId_ = getParquetColumnInfo(
maxSchemaElementIdx, maxRepeat, maxDefine, schemaIdx, columnIdx);
schema_ = createRowType(schemaWithId_->getChildren());
schema_ = createRowType(schemaWithId_->getChildren(), isCaseSensitive());
}

std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
Expand Down Expand Up @@ -229,7 +231,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
// Row type
auto childrenCopy = children;
return std::make_shared<const ParquetTypeWithId>(
createRowType(children),
createRowType(children, isCaseSensitive()),
std::move(childrenCopy),
curSchemaIdx,
maxSchemaElementIdx,
Expand Down Expand Up @@ -427,13 +429,17 @@ TypePtr ReaderBase::convertType(
}

std::shared_ptr<const RowType> ReaderBase::createRowType(
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
children) {
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children,
bool caseSensitive) {
std::vector<std::string> childNames;
std::vector<TypePtr> childTypes;
for (auto& child : children) {
childNames.push_back(
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_);
auto childName =
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_;
if (!caseSensitive) {
folly::toLowerAscii(childName);
}
childNames.push_back(childName);
childTypes.push_back(child->type);
}
return TypeFactory<TypeKind::ROW>::create(
Expand Down
6 changes: 5 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class ReaderBase {
return schemaWithId_;
}

const bool isCaseSensitive() const {
return options_.isCaseSensitive();
}

/// Ensures that streams are enqueued and loading for the row group at
/// 'currentGroup'. May start loading one or more subsequent groups.
void scheduleRowGroups(
Expand Down Expand Up @@ -100,7 +104,7 @@ class ReaderBase {

static std::shared_ptr<const RowType> createRowType(
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
children);
children, bool caseSensitive = true);

memory::MemoryPool& pool_;
const dwio::common::ReaderOptions& options_;
Expand Down
Binary file added velox/dwio/parquet/tests/examples/upper.parquet
Binary file not shown.
20 changes: 20 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ TEST_F(ParquetReaderTest, parseSample) {
EXPECT_EQ(type->childByName("b"), col1);
}

TEST_F(ParquetReaderTest, parseInCaseSensitive) {
// sample.parquet holds three columns (A: BIGINT, b: BIGINT) and
// 2 rows
const std::string sample(getExampleFilePath("upper.parquet"));

ReaderOptions readerOptions{defaultPool.get()};
readerOptions.setCaseSensitive(false);
ParquetReader reader = createReader(sample, readerOptions);
EXPECT_EQ(reader.numberOfRows(), 2ULL);

auto type = reader.typeWithId();
EXPECT_EQ(type->size(), 2ULL);
auto col0 = type->childAt(0);
EXPECT_EQ(col0->type->kind(), TypeKind::BIGINT);
auto col1 = type->childAt(1);
EXPECT_EQ(col1->type->kind(), TypeKind::BIGINT);
EXPECT_EQ(type->childByName("a"), col0);
EXPECT_EQ(type->childByName("b"), col1);
}

TEST_F(ParquetReaderTest, parseEmpty) {
// empty.parquet holds two columns (a: BIGINT, b: DOUBLE) and
// 0 rows.
Expand Down