-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: Implement dictionary based rowgroup skipping for dictionary encoded data #14907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1aa7915
cd59333
b5b46c6
319426a
f508d78
2706c36
0206655
9beb150
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,19 +13,55 @@ | |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/dwio/parquet/reader/ParquetData.h" | ||
#include <thrift/protocol/TCompactProtocol.h> | ||
#include <iomanip> | ||
#include "velox/dwio/parquet/thrift/ThriftTransport.h" | ||
|
||
#include "velox/dwio/common/BufferedInput.h" | ||
#include "velox/dwio/parquet/reader/ParquetStatsContext.h" | ||
|
||
namespace facebook::velox::parquet { | ||
|
||
namespace { | ||
|
||
// Helper methods for EncodingStats analysis (like Java Presto) | ||
bool hasDictionaryPages(const std::vector<thrift::PageEncodingStats>& stats) { | ||
for (const auto& pageStats : stats) { | ||
if (pageStats.page_type == thrift::PageType::DICTIONARY_PAGE) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
bool hasNonDictionaryEncodedPages( | ||
const std::vector<thrift::PageEncodingStats>& stats) { | ||
for (const auto& pageStats : stats) { | ||
if (pageStats.page_type == thrift::PageType::DATA_PAGE || | ||
pageStats.page_type == thrift::PageType::DATA_PAGE_V2) { | ||
// Check if this data page uses non-dictionary encoding | ||
if (pageStats.encoding != thrift::Encoding::PLAIN_DICTIONARY && | ||
pageStats.encoding != thrift::Encoding::RLE_DICTIONARY) { | ||
return true; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
} // namespace | ||
|
||
std::unique_ptr<dwio::common::FormatData> ParquetParams::toFormatData( | ||
const std::shared_ptr<const dwio::common::TypeWithId>& type, | ||
const common::ScanSpec& /*scanSpec*/) { | ||
return std::make_unique<ParquetData>( | ||
type, metaData_, pool(), sessionTimezone_); | ||
auto parquetData = | ||
std::make_unique<ParquetData>(type, metaData_, pool(), sessionTimezone_); | ||
// Set the BufferedInput if available | ||
if (bufferedInput_) { | ||
parquetData->setBufferedInput(bufferedInput_); | ||
} | ||
return parquetData; | ||
} | ||
|
||
void ParquetData::filterRowGroups( | ||
|
@@ -86,14 +122,29 @@ bool ParquetData::rowGroupMatches( | |
if (columnChunk.hasStatistics()) { | ||
auto columnStats = | ||
columnChunk.getColumnStatistics(type, rowGroup.numRows()); | ||
return testFilter(filter, columnStats.get(), rowGroup.numRows(), type); | ||
bool statisticsResult = | ||
testFilter(filter, columnStats.get(), rowGroup.numRows(), type); | ||
if (!statisticsResult) { | ||
return false; | ||
} | ||
} | ||
bool canUseDictionaryFiltering = | ||
isOnlyDictionaryEncodingPagesImpl(columnChunk); | ||
if (canUseDictionaryFiltering) { | ||
bool dictionaryResult = | ||
testFilterAgainstDictionary(rowGroupId, filter, columnChunk); | ||
if (!dictionaryResult) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
void ParquetData::enqueueRowGroup( | ||
uint32_t index, | ||
dwio::common::BufferedInput& input) { | ||
bufferedInput_ = &input; | ||
|
||
auto chunk = fileMetaDataPtr_.rowGroup(index).columnChunk(type_->column()); | ||
streams_.resize(fileMetaDataPtr_.numRowGroups()); | ||
VELOX_CHECK( | ||
|
@@ -150,4 +201,157 @@ std::pair<int64_t, int64_t> ParquetData::getRowGroupRegion( | |
return {fileOffset, length}; | ||
} | ||
|
||
// Presto's exact isOnlyDictionaryEncodingPages function from PR #4779 | ||
bool ParquetData::isOnlyDictionaryEncodingPagesImpl( | ||
const ColumnChunkMetaDataPtr& columnChunk) { | ||
// Files written with newer versions of Parquet libraries (e.g. | ||
// parquet-mr 1.9.0) will have EncodingStats available Otherwise, fallback to | ||
// v1 logic | ||
|
||
// Check for EncodingStats when available (newer Parquet files) | ||
if (columnChunk.hasEncodingStats()) { | ||
const auto& stats = columnChunk.getEncodingStats(); | ||
return hasDictionaryPages(stats) && !hasNonDictionaryEncodedPages(stats); | ||
} | ||
|
||
// Fallback to v1 logic | ||
auto encodings = columnChunk.getEncodings(); | ||
std::set<thrift::Encoding::type> encodingSet( | ||
encodings.begin(), encodings.end()); | ||
|
||
if (encodingSet.count(thrift::Encoding::PLAIN_DICTIONARY)) { | ||
// PLAIN_DICTIONARY was present, which means at least one page was | ||
// dictionary-encoded and 1.0 encodings are used | ||
// The only other allowed encodings are RLE and BIT_PACKED which are used | ||
// for repetition or definition levels | ||
std::set<thrift::Encoding::type> allowedEncodings = { | ||
thrift::Encoding::PLAIN_DICTIONARY, | ||
thrift::Encoding::RLE, // For repetition/definition levels | ||
thrift::Encoding::BIT_PACKED // For repetition/definition levels | ||
}; | ||
|
||
// Check if there are any disallowed encodings (equivalent to | ||
// Sets.difference in Java) | ||
for (const auto& encoding : encodings) { | ||
if (allowedEncodings.find(encoding) == allowedEncodings.end()) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
bool ParquetData::testFilterAgainstDictionary( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Main change which reads the first page of column chunk and applies filters |
||
uint32_t rowGroupId, | ||
const common::Filter* filter, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this a pointer and not a reference ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its actually passed from the upstream function which i didn't change since it was not related to this PR |
||
const ColumnChunkMetaDataPtr& columnChunk) { | ||
if (filter->kind() == common::FilterKind::kIsNull) { | ||
return true; // Conservative include for IsNull filters | ||
} | ||
|
||
auto dictionaryPtr = readDictionaryPageForFiltering(rowGroupId, columnChunk); | ||
|
||
auto numValues = dictionaryPtr->numValues; | ||
const void* dictPtr = dictionaryPtr->values->as<void>(); | ||
|
||
// Test if any dictionary value passes the filter | ||
auto testDict = [&]<typename T>() { | ||
const T* dict = reinterpret_cast<const T*>(dictPtr); | ||
|
||
// For larger dictionaries, we could use SIMD testValues() for better | ||
// performance For now, use simple scalar approach which is sufficient for | ||
// typical dictionary sizes | ||
for (int32_t i = 0; i < numValues; ++i) { | ||
if (common::applyFilter(*filter, dict[i])) | ||
return true; | ||
} | ||
return false; | ||
}; | ||
|
||
bool anyValuePasses = [&] { | ||
switch (type_->type()->kind()) { | ||
case TypeKind::BIGINT: | ||
return testDict.operator()<int64_t>(); | ||
case TypeKind::INTEGER: | ||
return testDict.operator()<int32_t>(); | ||
case TypeKind::VARCHAR: | ||
case TypeKind::VARBINARY: | ||
return testDict.operator()<StringView>(); | ||
case TypeKind::REAL: | ||
return testDict.operator()<float>(); | ||
case TypeKind::DOUBLE: | ||
return testDict.operator()<double>(); | ||
case TypeKind::BOOLEAN: | ||
return testDict.operator()<bool>(); | ||
default: | ||
return true; // Conservative fallback | ||
} | ||
}(); | ||
if (!anyValuePasses && filter->testNull()) { | ||
anyValuePasses = true; | ||
} | ||
return anyValuePasses; | ||
} | ||
|
||
// Read dictionary page directly for row group filtering (like Presto's | ||
// dictionaryPredicatesMatch) | ||
std::unique_ptr<dwio::common::DictionaryValues> | ||
ParquetData::readDictionaryPageForFiltering( | ||
uint32_t rowGroupId, | ||
const ColumnChunkMetaDataPtr& columnChunk) { | ||
auto inputStream = getInputStream(rowGroupId, columnChunk); | ||
if (!inputStream) { | ||
return std::make_unique<dwio::common::DictionaryValues>(); | ||
} | ||
|
||
// Create PageReader - it will automatically handle dictionary loading | ||
auto pageReader = std::make_unique<PageReader>( | ||
std::move(inputStream), | ||
pool_, | ||
type_, | ||
columnChunk.compression(), | ||
columnChunk.totalCompressedSize(), | ||
sessionTimezone_); | ||
// Read the first page header to trigger dictionary loading | ||
auto pageHeader = pageReader->readPageHeader(); | ||
|
||
// If it's a dictionary page, prepare it | ||
if (pageHeader.type == thrift::PageType::DICTIONARY_PAGE) { | ||
pageReader->prepareDictionary(pageHeader); | ||
} else { | ||
return std::make_unique<dwio::common::DictionaryValues>(); | ||
} | ||
const auto& dict = pageReader->dictionary(); | ||
return std::make_unique<dwio::common::DictionaryValues>(dict); | ||
} | ||
|
||
std::unique_ptr<dwio::common::SeekableInputStream> ParquetData::getInputStream( | ||
uint32_t rowGroupId, | ||
const ColumnChunkMetaDataPtr& columnChunk) { | ||
// Create new stream using the same logic as enqueueRowGroup | ||
if (!bufferedInput_) { | ||
return nullptr; | ||
} | ||
|
||
// Calculate read parameters (same as enqueueRowGroup) | ||
uint64_t chunkReadOffset = columnChunk.dataPageOffset(); | ||
if (columnChunk.hasDictionaryPageOffset() && | ||
columnChunk.dictionaryPageOffset() >= 4) { | ||
chunkReadOffset = columnChunk.dictionaryPageOffset(); | ||
} | ||
|
||
uint64_t readSize = (columnChunk.compression() == | ||
common::CompressionKind::CompressionKind_NONE) | ||
? columnChunk.totalUncompressedSize() | ||
: columnChunk.totalCompressedSize(); | ||
|
||
auto id = dwio::common::StreamIdentifier(type_->column()); | ||
auto stream = bufferedInput_->enqueue({chunkReadOffset, readSize}, &id); | ||
|
||
bufferedInput_->load(dwio::common::LogType::STRIPE); | ||
return stream; | ||
} | ||
|
||
} // namespace facebook::velox::parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this related to your change ? Can this be an independent change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related, we need bufferInput_ to read the first page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetData::getInputStream