Skip to content

Commit

Permalink
SplitReader refactor (#8995)
Browse files Browse the repository at this point in the history
Summary:
To prepare for the upcoming equality delete file read, we need to refactor the SplitReader a bit.

Pull Request resolved: #8995

Reviewed By: xiaoxmeng

Differential Revision: D55072998

Pulled By: Yuhta

fbshipit-source-id: 662459041b947d51ffaa98b57a50e4ebdd5b36e3
  • Loading branch information
yingsu00 authored and facebook-github-bot committed Apr 10, 2024
1 parent a9897b1 commit b5ea2d7
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 230 deletions.
29 changes: 17 additions & 12 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,10 @@ std::unique_ptr<dwio::common::SerDeOptions> parseSerdeParameters(

void configureReaderOptions(
dwio::common::ReaderOptions& readerOptions,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties,
const std::shared_ptr<HiveTableHandle>& hiveTableHandle,
const std::shared_ptr<HiveConnectorSplit>& hiveSplit) {
const std::shared_ptr<const HiveTableHandle>& hiveTableHandle,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit) {
configureReaderOptions(
readerOptions,
hiveConfig,
Expand All @@ -465,10 +465,10 @@ void configureReaderOptions(

void configureReaderOptions(
dwio::common::ReaderOptions& readerOptions,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties,
const RowTypePtr& fileSchema,
const std::shared_ptr<HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::unordered_map<std::string, std::string>& tableParameters) {
readerOptions.setLoadQuantum(hiveConfig->loadQuantum());
readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes());
Expand Down Expand Up @@ -502,10 +502,10 @@ void configureReaderOptions(
void configureRowReaderOptions(
dwio::common::RowReaderOptions& rowReaderOptions,
const std::unordered_map<std::string, std::string>& tableParameters,
std::shared_ptr<common::ScanSpec> scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<common::ScanSpec>& scanSpec,
const std::shared_ptr<common::MetadataFilter>& metadataFilter,
const RowTypePtr& rowType,
std::shared_ptr<HiveConnectorSplit> hiveSplit) {
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit) {
auto skipRowsIt =
tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount);
if (skipRowsIt != tableParameters.end()) {
Expand Down Expand Up @@ -569,13 +569,15 @@ bool applyPartitionFilter(
} // namespace

bool testFilters(
common::ScanSpec* scanSpec,
dwio::common::Reader* reader,
const common::ScanSpec* scanSpec,
const dwio::common::Reader* reader,
const std::string& filePath,
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
VELOX_CHECK_EQ(partitionKey.size(), partitionKeysHandle.size());

auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
Expand All @@ -586,8 +588,11 @@ bool testFilters(
// If missing column is partition key.
auto iter = partitionKey.find(name);
if (iter != partitionKey.end() && iter->second.has_value()) {
auto handlesIter = partitionKeysHandle.find(name);
VELOX_CHECK(handlesIter != partitionKeysHandle.end());

return applyPartitionFilter(
(*partitionKeysHandle)[name]->dataType()->kind(),
handlesIter->second->dataType()->kind(),
iter->second.value(),
child->filter());
}
Expand Down
22 changes: 11 additions & 11 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,34 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(

void configureReaderOptions(
dwio::common::ReaderOptions& readerOptions,
const std::shared_ptr<HiveConfig>& config,
const std::shared_ptr<const HiveConfig>& config,
const Config* sessionProperties,
const std::shared_ptr<HiveTableHandle>& hiveTableHandle,
const std::shared_ptr<HiveConnectorSplit>& hiveSplit);
const std::shared_ptr<const HiveTableHandle>& hiveTableHandle,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit);

void configureReaderOptions(
dwio::common::ReaderOptions& readerOptions,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties,
const RowTypePtr& fileSchema,
const std::shared_ptr<HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::unordered_map<std::string, std::string>& tableParameters = {});

void configureRowReaderOptions(
dwio::common::RowReaderOptions& rowReaderOptions,
const std::unordered_map<std::string, std::string>& tableParameters,
std::shared_ptr<common::ScanSpec> scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<common::ScanSpec>& scanSpec,
const std::shared_ptr<common::MetadataFilter>& metadataFilter,
const RowTypePtr& rowType,
std::shared_ptr<HiveConnectorSplit> hiveSplit);
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit);

bool testFilters(
common::ScanSpec* scanSpec,
dwio::common::Reader* reader,
const common::ScanSpec* scanSpec,
const dwio::common::Reader* reader,
const std::string& filePath,
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle);

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
Expand Down
10 changes: 5 additions & 5 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
return SplitReader::create(
split_,
hiveTableHandle_,
scanSpec_,
readerOutputType_,
&partitionKeys_,
fileHandleFactory_,
executor_,
connectorQueryCtx_,
hiveConfig_,
ioStats_);
readerOutputType_,
ioStats_,
fileHandleFactory_,
executor_,
scanSpec_);
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
Expand Down
Loading

0 comments on commit b5ea2d7

Please sign in to comment.