Skip to content

Commit ed53e4a

Browse files
jinchengchenghhzhejiangxiaomai
authored andcommitted
Support parquet read case sensitive mode (#126)
1 parent fb252e9 commit ed53e4a

File tree

11 files changed

+131
-24
lines changed

11 files changed

+131
-24
lines changed

velox/connectors/hive/HiveConfig.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@ uint32_t HiveConfig::maxPartitionsPerWriters(const Config* config) {
5252
return config->get<uint32_t>(kMaxPartitionsPerWriters, 100);
5353
}
5454

55+
bool HiveConfig::isCaseSensitive(const Config* config) {
56+
return config->get<bool>(kCaseSensitive, true);
57+
}
58+
5559
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveConfig.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ class HiveConfig {
3939
const Config* config);
4040

4141
static uint32_t maxPartitionsPerWriters(const Config* config);
42+
static constexpr const char* kCaseSensitive = "case_sensitive";
43+
44+
static bool isCaseSensitive(const Config* config);
4245
};
4346

4447
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveConnector.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ HiveDataSource::HiveDataSource(
118118
ExpressionEvaluator* expressionEvaluator,
119119
memory::MemoryAllocator* allocator,
120120
const std::string& scanId,
121+
bool caseSensitive,
121122
folly::Executor* executor)
122123
: outputType_(outputType),
123124
fileHandleFactory_(fileHandleFactory),
@@ -196,6 +197,8 @@ HiveDataSource::HiveDataSource(
196197
readerOutputType_ = ROW(std::move(names), std::move(types));
197198
}
198199

200+
readerOpts_.setCaseSensitive(caseSensitive);
201+
199202
rowReaderOpts_.setScanSpec(scanSpec_);
200203
rowReaderOpts_.setMetadataFilter(metadataFilter_);
201204

@@ -257,7 +260,8 @@ template <TypeKind ToKind>
257260
velox::variant convertFromString(const std::optional<std::string>& value) {
258261
if (value.has_value()) {
259262
// No need for casting if ToKind is VARCHAR or VARBINARY.
260-
if constexpr (ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
263+
if constexpr (
264+
ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
261265
return velox::variant(value.value());
262266
}
263267
bool nullOutput = false;
@@ -398,10 +402,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
398402
scanSpec_->resetCachedValues();
399403

400404
// Check filters and see if the whole split can be skipped
401-
if (!testFilters(
402-
scanSpec_.get(),
403-
reader_.get(),
404-
split_->filePath)) {
405+
if (!testFilters(scanSpec_.get(), reader_.get(), split_->filePath)) {
405406
emptySplit_ = true;
406407
++runtimeStats_.skippedSplits;
407408
runtimeStats_.skippedSplitBytes += split_->length;
@@ -420,7 +421,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
420421
static const RowTypePtr kEmpty{ROW({}, {})};
421422
cs = std::make_shared<dwio::common::ColumnSelector>(kEmpty);
422423
} else {
423-
cs = std::make_shared<dwio::common::ColumnSelector>(fileType, columnNames);
424+
cs = std::make_shared<dwio::common::ColumnSelector>(
425+
fileType, columnNames, nullptr, readerOpts_.isCaseSensitive());
424426
}
425427

426428
rowReader_ = reader_->createRowReader(

velox/connectors/hive/HiveConnector.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "velox/expression/Expr.h"
2828
#include "velox/type/Filter.h"
2929
#include "velox/type/Subfield.h"
30+
#include "velox/connectors/hive/HiveConfig.h"
3031

3132
namespace facebook::velox::connector::hive {
3233

@@ -112,6 +113,7 @@ class HiveDataSource : public DataSource {
112113
ExpressionEvaluator* FOLLY_NONNULL expressionEvaluator,
113114
memory::MemoryAllocator* FOLLY_NONNULL allocator,
114115
const std::string& scanId,
116+
bool caseSensitive,
115117
folly::Executor* FOLLY_NULLABLE executor);
116118

117119
void addSplit(std::shared_ptr<ConnectorSplit> split) override;
@@ -228,6 +230,7 @@ class HiveConnector final : public Connector {
228230
connectorQueryCtx->expressionEvaluator(),
229231
connectorQueryCtx->allocator(),
230232
connectorQueryCtx->scanId(),
233+
HiveConfig::isCaseSensitive(connectorQueryCtx->config()),
231234
executor_);
232235
}
233236

velox/dwio/common/ColumnSelector.h

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,21 @@ class ColumnSelector {
5757
*/
5858
explicit ColumnSelector(
5959
const std::shared_ptr<const velox::RowType>& schema,
60-
const MetricsLogPtr& log = nullptr)
61-
: ColumnSelector(schema, schema, log) {}
60+
const MetricsLogPtr& log = nullptr,
61+
const bool caseSensitive = true)
62+
: ColumnSelector(schema, schema, log, caseSensitive) {}
6263

6364
explicit ColumnSelector(
6465
const std::shared_ptr<const velox::RowType>& schema,
6566
const std::shared_ptr<const velox::RowType>& contentSchema,
66-
MetricsLogPtr log = nullptr)
67+
MetricsLogPtr log = nullptr,
68+
const bool caseSensitive = true)
6769
: log_{std::move(log)}, schema_{schema}, state_{ReadState::kAll} {
6870
buildNodes(schema, contentSchema);
6971

7072
// no filter, read everything
7173
setReadAll();
74+
checkSelectColDuplicate(caseSensitive);
7275
}
7376

7477
/**
@@ -77,18 +80,21 @@ class ColumnSelector {
7780
explicit ColumnSelector(
7881
const std::shared_ptr<const velox::RowType>& schema,
7982
const std::vector<std::string>& names,
80-
const MetricsLogPtr& log = nullptr)
81-
: ColumnSelector(schema, schema, names, log) {}
83+
const MetricsLogPtr& log = nullptr,
84+
const bool caseSensitive = true)
85+
: ColumnSelector(schema, schema, names, log, caseSensitive) {}
8286

8387
explicit ColumnSelector(
8488
const std::shared_ptr<const velox::RowType>& schema,
8589
const std::shared_ptr<const velox::RowType>& contentSchema,
8690
const std::vector<std::string>& names,
87-
MetricsLogPtr log = nullptr)
91+
MetricsLogPtr log = nullptr,
92+
const bool caseSensitive = true)
8893
: log_{std::move(log)},
8994
schema_{schema},
9095
state_{names.empty() ? ReadState::kAll : ReadState::kPartial} {
91-
acceptFilter(schema, contentSchema, names);
96+
acceptFilter(schema, contentSchema, names, false);
97+
checkSelectColDuplicate(caseSensitive);
9298
}
9399

94100
/**
@@ -98,19 +104,23 @@ class ColumnSelector {
98104
const std::shared_ptr<const velox::RowType>& schema,
99105
const std::vector<uint64_t>& ids,
100106
const bool filterByNodes = false,
101-
const MetricsLogPtr& log = nullptr)
102-
: ColumnSelector(schema, schema, ids, filterByNodes, log) {}
107+
const MetricsLogPtr& log = nullptr,
108+
const bool caseSensitive = true)
109+
: ColumnSelector(schema, schema, ids, filterByNodes, log, caseSensitive) {
110+
}
103111

104112
explicit ColumnSelector(
105113
const std::shared_ptr<const velox::RowType>& schema,
106114
const std::shared_ptr<const velox::RowType>& contentSchema,
107115
const std::vector<uint64_t>& ids,
108116
const bool filterByNodes = false,
109-
MetricsLogPtr log = nullptr)
117+
MetricsLogPtr log = nullptr,
118+
const bool caseSensitive = true)
110119
: log_{std::move(log)},
111120
schema_{schema},
112121
state_{ids.empty() ? ReadState::kAll : ReadState::kPartial} {
113122
acceptFilter(schema, contentSchema, ids, filterByNodes);
123+
checkSelectColDuplicate(caseSensitive);
114124
}
115125

116126
// set a specific node to read state
@@ -301,6 +311,28 @@ class ColumnSelector {
301311
// get node ID list to be read
302312
std::vector<uint64_t> getNodeFilter() const;
303313

314+
void checkSelectColDuplicate(bool caseSensitive) {
315+
if (caseSensitive) {
316+
return;
317+
}
318+
std::unordered_map<std::string, int> names;
319+
for (auto node : nodes_) {
320+
auto name = node->getNode().name;
321+
if (names.find(name) == names.end()) {
322+
names[name] = 1;
323+
} else {
324+
names[name] = names[name] + 1;
325+
}
326+
for (auto filter : filter_) {
327+
if (names[filter.name] > 1) {
328+
VELOX_USER_FAIL(
329+
"Found duplicate field(s) {} in case-insensitive mode",
330+
filter.name);
331+
}
332+
}
333+
}
334+
}
335+
304336
// accept filter
305337
template <typename T>
306338
void acceptFilter(

velox/dwio/common/Options.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,10 @@ class ReaderOptions {
346346
int32_t maxCoalesceDistance_{kDefaultCoalesceDistance};
347347
SerDeOptions serDeOptions;
348348
std::shared_ptr<encryption::DecrypterFactory> decrypterFactory_;
349+
349350
uint64_t directorySizeGuess{kDefaultDirectorySizeGuess};
350351
uint64_t filePreloadThreshold{kDefaultFilePreloadThreshold};
352+
bool caseSensitive;
351353

352354
public:
353355
static constexpr int32_t kDefaultLoadQuantum = 8 << 20; // 8MB
@@ -362,7 +364,8 @@ class ReaderOptions {
362364
fileFormat(FileFormat::UNKNOWN),
363365
fileSchema(nullptr),
364366
autoPreloadLength(DEFAULT_AUTO_PRELOAD_SIZE),
365-
prefetchMode(PrefetchMode::PREFETCH) {
367+
prefetchMode(PrefetchMode::PREFETCH),
368+
caseSensitive(true) {
366369
// PASS
367370
}
368371

@@ -484,6 +487,12 @@ class ReaderOptions {
484487
return *this;
485488
}
486489

490+
ReaderOptions& setCaseSensitive(bool caseSensitiveMode) {
491+
caseSensitive = caseSensitiveMode;
492+
493+
return *this;
494+
}
495+
487496
/**
488497
* Get the desired tail location.
489498
* @return if not set, return the maximum long.
@@ -549,6 +558,10 @@ class ReaderOptions {
549558
uint64_t getFilePreloadThreshold() const {
550559
return filePreloadThreshold;
551560
}
561+
562+
const bool isCaseSensitive() const {
563+
return caseSensitive;
564+
}
552565
};
553566

554567
} // namespace common

velox/dwio/common/tests/TestColumnSelector.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "velox/dwio/common/ColumnSelector.h"
1919
#include "velox/dwio/type/fbhive/HiveTypeParser.h"
2020
#include "velox/type/Type.h"
21+
#include "velox/common/base/VeloxException.h"
2122

2223
using namespace facebook::velox::dwio::common;
2324
using facebook::velox::RowType;
@@ -630,3 +631,22 @@ TEST(TestColumnSelector, testNonexistingColFilters) {
630631
std::vector<std::string>{"id", "values", "notexists#[10,20,30,40]"}),
631632
std::runtime_error);
632633
}
634+
635+
TEST(TestColumnSelector, testCaseInsensitiveDuplicateColFilters) {
636+
const auto schema = std::dynamic_pointer_cast<const RowType>(
637+
HiveTypeParser().parse("struct<"
638+
"id:bigint"
639+
"id:bigint"
640+
"values:array<float>"
641+
"tags:map<int, string>"
642+
"notes:struct<f1:int, f2:double, f3:string>"
643+
"memo:string"
644+
"extra:string>"));
645+
646+
EXPECT_THROW(
647+
ColumnSelector cs(
648+
schema,
649+
std::vector<std::string>{"id"}, nullptr, false),
650+
facebook::velox::VeloxException);
651+
}
652+

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "velox/dwio/parquet/reader/StructColumnReader.h"
2222
#include "velox/dwio/parquet/thrift/ThriftTransport.h"
2323

24+
#include <folly/String.h>
25+
2426
namespace facebook::velox::parquet {
2527

2628
ReaderBase::ReaderBase(
@@ -112,7 +114,7 @@ void ReaderBase::initializeSchema() {
112114
uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1;
113115
schemaWithId_ = getParquetColumnInfo(
114116
maxSchemaElementIdx, maxRepeat, maxDefine, schemaIdx, columnIdx);
115-
schema_ = createRowType(schemaWithId_->getChildren());
117+
schema_ = createRowType(schemaWithId_->getChildren(), isCaseSensitive());
116118
}
117119

118120
std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
@@ -231,7 +233,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
231233
// Row type
232234
auto childrenCopy = children;
233235
return std::make_shared<const ParquetTypeWithId>(
234-
createRowType(children),
236+
createRowType(children, isCaseSensitive()),
235237
std::move(childrenCopy),
236238
curSchemaIdx,
237239
maxSchemaElementIdx,
@@ -429,13 +431,17 @@ TypePtr ReaderBase::convertType(
429431
}
430432

431433
std::shared_ptr<const RowType> ReaderBase::createRowType(
432-
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
433-
children) {
434+
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children,
435+
bool caseSensitive) {
434436
std::vector<std::string> childNames;
435437
std::vector<TypePtr> childTypes;
436438
for (auto& child : children) {
437-
childNames.push_back(
438-
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_);
439+
auto childName =
440+
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_;
441+
if (!caseSensitive) {
442+
folly::toLowerAscii(childName);
443+
}
444+
childNames.push_back(childName);
439445
childTypes.push_back(child->type);
440446
}
441447
return TypeFactory<TypeKind::ROW>::create(

velox/dwio/parquet/reader/ParquetReader.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class ReaderBase {
6666
return schemaWithId_;
6767
}
6868

69+
const bool isCaseSensitive() const {
70+
return options_.isCaseSensitive();
71+
}
72+
6973
/// Ensures that streams are enqueued and loading for the row group at
7074
/// 'currentGroup'. May start loading one or more subsequent groups.
7175
void scheduleRowGroups(
@@ -97,7 +101,7 @@ class ReaderBase {
97101

98102
static std::shared_ptr<const RowType> createRowType(
99103
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
100-
children);
104+
children, bool caseSensitive = true);
101105

102106
memory::MemoryPool& pool_;
103107
const uint64_t directorySizeGuess_;
720 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)