Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions velox/dwio/parquet/reader/Metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ ColumnChunkMetaDataPtr::getColumnStatistics(
thriftColumnChunkPtr(ptr_)->meta_data.statistics, *type, numRows);
};

bool ColumnChunkMetaDataPtr::hasEncodingStats() const {
return hasMetadata() &&
thriftColumnChunkPtr(ptr_)->meta_data.__isset.encoding_stats;
}

const std::vector<thrift::PageEncodingStats>&
ColumnChunkMetaDataPtr::getEncodingStats() const {
return thriftColumnChunkPtr(ptr_)->meta_data.encoding_stats;
}

const std::vector<thrift::Encoding::type>&
ColumnChunkMetaDataPtr::getEncodings() const {
return thriftColumnChunkPtr(ptr_)->meta_data.encodings;
}

std::string ColumnChunkMetaDataPtr::getColumnMetadataStatsMinValue() {
VELOX_CHECK(hasStatistics());
return thriftColumnChunkPtr(ptr_)->meta_data.statistics.min_value;
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/parquet/reader/Metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/dwio/common/Statistics.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {

Expand All @@ -37,6 +38,12 @@ class ColumnChunkMetaDataPtr {
/// Check the presence of the dictionary page offset in ColumnChunk metadata.
bool hasDictionaryPageOffset() const;

bool hasEncodingStats() const;

const std::vector<thrift::PageEncodingStats>& getEncodingStats() const;

const std::vector<thrift::Encoding::type>& getEncodings() const;

/// Return the ColumnChunk statistics.
std::unique_ptr<dwio::common::ColumnStatistics> getColumnStatistics(
const TypePtr type,
Expand Down
43 changes: 25 additions & 18 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,31 @@ class PageReader {
return sessionTimezone_;
}

// Access the loaded dictionary for filtering purposes
const dwio::common::DictionaryValues& dictionary() const {
return dictionary_;
}

// Starts iterating over 'rows', which may span multiple pages. 'rows' are
// relative to current position, with 0 meaning the first
// unprocessed value in the current page, i.e. the row after the
// last row touched on a previous call to skip() or
// readWithVisitor(). This is the first row of the first data page
// if first call.
void startVisit(folly::Range<const vector_size_t*> rows);

// Seeks to page containing 'row'. Returns the number of rows to skip on
// the page to get to 'row'. Clears the state and positions the stream and
// initializes a decoder for the found page. row kRepDefOnly means
// getting repdefs for the next page. If non-top level column, 'row'
// is interpreted in terms of leaf rows, including leaf
// nulls. Seeking ahead of pages covered by decodeRepDefs is not
// allowed for non-top level columns.
void seekToPage(int64_t row);

// Prepares dictionary from a dictionary page header
void prepareDictionary(const thrift::PageHeader& pageHeader);

private:
// Indicates that we only want the repdefs for the next page. Used when
// prereading repdefs with seekToPage.
Expand All @@ -174,15 +199,6 @@ class PageReader {
// 'pageData_' + 'encodedDataSize_'.
void makedecoder();

// Reads and skips pages until finding a data page that contains
// 'row'. Reads and sets 'rowOfPage_' and 'numRowsInPage_' and
// initializes a decoder for the found page. row kRepDefOnly means
// getting repdefs for the next page. If non-top level column, 'row'
// is interpreted in terms of leaf rows, including leaf
// nulls. Seeking ahead of pages covered by decodeRepDefs is not
// allowed for non-top level columns.
void seekToPage(int64_t row);

// Preloads the repdefs for the column chunk. To avoid preloading,
// would need a way too clone the input stream so that one stream
// reads ahead for repdefs and the other tracks the data. This is
Expand All @@ -202,7 +218,6 @@ class PageReader {

void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row);
void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row);
void prepareDictionary(const thrift::PageHeader& pageHeader);
void makeDecoder();

// For a non-top level leaf, reads the defs and sets 'leafNulls_' and
Expand Down Expand Up @@ -230,14 +245,6 @@ class PageReader {
return data;
}

// Starts iterating over 'rows', which may span multiple pages. 'rows' are
// relative to current position, with 0 meaning the first
// unprocessed value in the current page, i.e. the row after the
// last row touched on a previous call to skip() or
// readWithVisitor(). This is the first row of the first data page
// if first call.
void startVisit(folly::Range<const vector_size_t*> rows);

// Seeks to the next page in a range given by startVisit(). Returns
// true if there are unprocessed rows in the set given to
// startVisit(). Seeks 'this' to the appropriate page and sets
Expand Down
212 changes: 208 additions & 4 deletions velox/dwio/parquet/reader/ParquetData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/parquet/reader/ParquetData.h"
#include <thrift/protocol/TCompactProtocol.h>
#include <iomanip>
#include "velox/dwio/parquet/thrift/ThriftTransport.h"

#include "velox/dwio/common/BufferedInput.h"
#include "velox/dwio/parquet/reader/ParquetStatsContext.h"

namespace facebook::velox::parquet {

namespace {

// Helper methods for EncodingStats analysis (like Java Presto)
bool hasDictionaryPages(const std::vector<thrift::PageEncodingStats>& stats) {
for (const auto& pageStats : stats) {
if (pageStats.page_type == thrift::PageType::DICTIONARY_PAGE) {
return true;
}
}
return false;
}

bool hasNonDictionaryEncodedPages(
const std::vector<thrift::PageEncodingStats>& stats) {
for (const auto& pageStats : stats) {
if (pageStats.page_type == thrift::PageType::DATA_PAGE ||
pageStats.page_type == thrift::PageType::DATA_PAGE_V2) {
// Check if this data page uses non-dictionary encoding
if (pageStats.encoding != thrift::Encoding::PLAIN_DICTIONARY &&
pageStats.encoding != thrift::Encoding::RLE_DICTIONARY) {
return true;
}
}
}
return false;
}

} // namespace

std::unique_ptr<dwio::common::FormatData> ParquetParams::toFormatData(
const std::shared_ptr<const dwio::common::TypeWithId>& type,
const common::ScanSpec& /*scanSpec*/) {
return std::make_unique<ParquetData>(
type, metaData_, pool(), sessionTimezone_);
auto parquetData =
std::make_unique<ParquetData>(type, metaData_, pool(), sessionTimezone_);
// Set the BufferedInput if available
if (bufferedInput_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this related to your change ? Can this be an independent change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related, we need bufferInput_ to read the first page

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetData::getInputStream

parquetData->setBufferedInput(bufferedInput_);
}
return parquetData;
}

void ParquetData::filterRowGroups(
Expand Down Expand Up @@ -86,14 +122,29 @@ bool ParquetData::rowGroupMatches(
if (columnChunk.hasStatistics()) {
auto columnStats =
columnChunk.getColumnStatistics(type, rowGroup.numRows());
return testFilter(filter, columnStats.get(), rowGroup.numRows(), type);
bool statisticsResult =
testFilter(filter, columnStats.get(), rowGroup.numRows(), type);
if (!statisticsResult) {
return false;
}
}
bool canUseDictionaryFiltering =
isOnlyDictionaryEncodingPagesImpl(columnChunk);
if (canUseDictionaryFiltering) {
bool dictionaryResult =
testFilterAgainstDictionary(rowGroupId, filter, columnChunk);
if (!dictionaryResult) {
return false;
}
}
return true;
}

void ParquetData::enqueueRowGroup(
uint32_t index,
dwio::common::BufferedInput& input) {
bufferedInput_ = &input;

auto chunk = fileMetaDataPtr_.rowGroup(index).columnChunk(type_->column());
streams_.resize(fileMetaDataPtr_.numRowGroups());
VELOX_CHECK(
Expand Down Expand Up @@ -150,4 +201,157 @@ std::pair<int64_t, int64_t> ParquetData::getRowGroupRegion(
return {fileOffset, length};
}

// Presto's exact isOnlyDictionaryEncodingPages function from PR #4779
bool ParquetData::isOnlyDictionaryEncodingPagesImpl(
const ColumnChunkMetaDataPtr& columnChunk) {
// Files written with newer versions of Parquet libraries (e.g.
// parquet-mr 1.9.0) will have EncodingStats available Otherwise, fallback to
// v1 logic

// Check for EncodingStats when available (newer Parquet files)
if (columnChunk.hasEncodingStats()) {
const auto& stats = columnChunk.getEncodingStats();
return hasDictionaryPages(stats) && !hasNonDictionaryEncodedPages(stats);
}

// Fallback to v1 logic
auto encodings = columnChunk.getEncodings();
std::set<thrift::Encoding::type> encodingSet(
encodings.begin(), encodings.end());

if (encodingSet.count(thrift::Encoding::PLAIN_DICTIONARY)) {
// PLAIN_DICTIONARY was present, which means at least one page was
// dictionary-encoded and 1.0 encodings are used
// The only other allowed encodings are RLE and BIT_PACKED which are used
// for repetition or definition levels
std::set<thrift::Encoding::type> allowedEncodings = {
thrift::Encoding::PLAIN_DICTIONARY,
thrift::Encoding::RLE, // For repetition/definition levels
thrift::Encoding::BIT_PACKED // For repetition/definition levels
};

// Check if there are any disallowed encodings (equivalent to
// Sets.difference in Java)
for (const auto& encoding : encodings) {
if (allowedEncodings.find(encoding) == allowedEncodings.end()) {
return false;
}
}
return true;
}

return false;
}

bool ParquetData::testFilterAgainstDictionary(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main change which reads the first page of column chunk and applies filters

uint32_t rowGroupId,
const common::Filter* filter,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a pointer and not a reference ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its actually passed from the upstream function which i didn't change since it was not related to this PR

const ColumnChunkMetaDataPtr& columnChunk) {
if (filter->kind() == common::FilterKind::kIsNull) {
return true; // Conservative include for IsNull filters
}

auto dictionaryPtr = readDictionaryPageForFiltering(rowGroupId, columnChunk);

auto numValues = dictionaryPtr->numValues;
const void* dictPtr = dictionaryPtr->values->as<void>();

// Test if any dictionary value passes the filter
auto testDict = [&]<typename T>() {
const T* dict = reinterpret_cast<const T*>(dictPtr);

// For larger dictionaries, we could use SIMD testValues() for better
// performance For now, use simple scalar approach which is sufficient for
// typical dictionary sizes
for (int32_t i = 0; i < numValues; ++i) {
if (common::applyFilter(*filter, dict[i]))
return true;
}
return false;
};

bool anyValuePasses = [&] {
switch (type_->type()->kind()) {
case TypeKind::BIGINT:
return testDict.operator()<int64_t>();
case TypeKind::INTEGER:
return testDict.operator()<int32_t>();
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
return testDict.operator()<StringView>();
case TypeKind::REAL:
return testDict.operator()<float>();
case TypeKind::DOUBLE:
return testDict.operator()<double>();
case TypeKind::BOOLEAN:
return testDict.operator()<bool>();
default:
return true; // Conservative fallback
}
}();
if (!anyValuePasses && filter->testNull()) {
anyValuePasses = true;
}
return anyValuePasses;
}

// Read dictionary page directly for row group filtering (like Presto's
// dictionaryPredicatesMatch)
std::unique_ptr<dwio::common::DictionaryValues>
ParquetData::readDictionaryPageForFiltering(
uint32_t rowGroupId,
const ColumnChunkMetaDataPtr& columnChunk) {
auto inputStream = getInputStream(rowGroupId, columnChunk);
if (!inputStream) {
return std::make_unique<dwio::common::DictionaryValues>();
}

// Create PageReader - it will automatically handle dictionary loading
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
pool_,
type_,
columnChunk.compression(),
columnChunk.totalCompressedSize(),
sessionTimezone_);
// Read the first page header to trigger dictionary loading
auto pageHeader = pageReader->readPageHeader();

// If it's a dictionary page, prepare it
if (pageHeader.type == thrift::PageType::DICTIONARY_PAGE) {
pageReader->prepareDictionary(pageHeader);
} else {
return std::make_unique<dwio::common::DictionaryValues>();
}
const auto& dict = pageReader->dictionary();
return std::make_unique<dwio::common::DictionaryValues>(dict);
}

std::unique_ptr<dwio::common::SeekableInputStream> ParquetData::getInputStream(
uint32_t rowGroupId,
const ColumnChunkMetaDataPtr& columnChunk) {
// Create new stream using the same logic as enqueueRowGroup
if (!bufferedInput_) {
return nullptr;
}

// Calculate read parameters (same as enqueueRowGroup)
uint64_t chunkReadOffset = columnChunk.dataPageOffset();
if (columnChunk.hasDictionaryPageOffset() &&
columnChunk.dictionaryPageOffset() >= 4) {
chunkReadOffset = columnChunk.dictionaryPageOffset();
}

uint64_t readSize = (columnChunk.compression() ==
common::CompressionKind::CompressionKind_NONE)
? columnChunk.totalUncompressedSize()
: columnChunk.totalCompressedSize();

auto id = dwio::common::StreamIdentifier(type_->column());
auto stream = bufferedInput_->enqueue({chunkReadOffset, readSize}, &id);

bufferedInput_->load(dwio::common::LogType::STRIPE);
return stream;
}

} // namespace facebook::velox::parquet
Loading
Loading