Skip to content

Commit 1aa7915

Browse files
committed
Support rowgroup skipping based on parquet dict encoding
1 parent 426516c commit 1aa7915

File tree

10 files changed

+616
-36
lines changed

10 files changed

+616
-36
lines changed

velox/connectors/hive/HiveConnectorUtil.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
426426
subfieldSpecs.clear();
427427
}
428428
}
429-
430429
for (auto& pair : filters) {
431430
const auto name = pair.first.toString();
432431
// SelectiveColumnReader doesn't support constant columns with filters,
@@ -444,7 +443,6 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
444443
VELOX_CHECK_NULL(spec->filter());
445444
fieldSpec->setFilter(pair.second);
446445
}
447-
448446
if (disableStatsBasedFilterReorder) {
449447
spec->disableStatsBasedFilterReorder();
450448
}

velox/connectors/hive/HiveDataSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,18 @@ HiveDataSource::HiveDataSource(
123123
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters(), infoColumns_);
124124
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
125125
}
126-
127126
for (const auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
128127
filters_.emplace(k.clone(), v);
129128
}
129+
130130
double sampleRate = 1;
131131
auto remainingFilter = extractFiltersFromRemainingFilter(
132132
hiveTableHandle_->remainingFilter(),
133133
expressionEvaluator_,
134134
false,
135135
filters_,
136136
sampleRate);
137+
137138
if (sampleRate != 1) {
138139
randomSkip_ = std::make_shared<random::RandomSkipTracker>(sampleRate);
139140
}

velox/connectors/hive/SplitReader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ void SplitReader::createRowReader(
386386
RowTypePtr rowType,
387387
std::optional<bool> rowSizeTrackingEnabled) {
388388
VELOX_CHECK_NULL(baseRowReader_);
389+
389390
configureRowReaderOptions(
390391
hiveTableHandle_->tableParameters(),
391392
scanSpec_,

velox/dwio/parquet/reader/Metadata.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ ColumnChunkMetaDataPtr::getColumnStatistics(
219219
thriftColumnChunkPtr(ptr_)->meta_data.statistics, *type, numRows);
220220
};
221221

222+
bool ColumnChunkMetaDataPtr::hasEncodingStats() const {
223+
return hasMetadata() && thriftColumnChunkPtr(ptr_)->meta_data.__isset.encoding_stats;
224+
}
225+
226+
const std::vector<thrift::PageEncodingStats>& ColumnChunkMetaDataPtr::getEncodingStats() const {
227+
return thriftColumnChunkPtr(ptr_)->meta_data.encoding_stats;
228+
}
229+
230+
const std::vector<thrift::Encoding::type>& ColumnChunkMetaDataPtr::getEncoding() const {
231+
return thriftColumnChunkPtr(ptr_)->meta_data.encodings;
232+
}
233+
222234
std::string ColumnChunkMetaDataPtr::getColumnMetadataStatsMinValue() {
223235
VELOX_CHECK(hasStatistics());
224236
return thriftColumnChunkPtr(ptr_)->meta_data.statistics.min_value;
@@ -243,6 +255,18 @@ int64_t ColumnChunkMetaDataPtr::dictionaryPageOffset() const {
243255
return thriftColumnChunkPtr(ptr_)->meta_data.dictionary_page_offset;
244256
}
245257

258+
bool ColumnChunkMetaDataPtr::hasBloomFilter() const {
259+
return hasMetadata() &&
260+
thriftColumnChunkPtr(ptr_)->meta_data.__isset.bloom_filter_offset;
261+
}
262+
263+
std::optional<int64_t> ColumnChunkMetaDataPtr::bloom_filter_offset() const {
264+
if (hasBloomFilter()) {
265+
return thriftColumnChunkPtr(ptr_)->meta_data.bloom_filter_offset;
266+
}
267+
return std::nullopt;
268+
}
269+
246270
common::CompressionKind ColumnChunkMetaDataPtr::compression() const {
247271
return thriftCodecToCompressionKind(
248272
thriftColumnChunkPtr(ptr_)->meta_data.codec);

velox/dwio/parquet/reader/Metadata.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "velox/dwio/common/Statistics.h"
2020
#include "velox/dwio/common/compression/Compression.h"
21+
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"
2122

2223
namespace facebook::velox::parquet {
2324

@@ -37,6 +38,13 @@ class ColumnChunkMetaDataPtr {
3738
/// Check the presence of the dictionary page offset in ColumnChunk metadata.
3839
bool hasDictionaryPageOffset() const;
3940

41+
bool hasEncodingStats() const;
42+
43+
const std::vector<thrift::PageEncodingStats>& getEncodingStats() const;
44+
45+
const std::vector<thrift::Encoding::type>& getEncoding() const;
46+
47+
4048
/// Return the ColumnChunk statistics.
4149
std::unique_ptr<dwio::common::ColumnStatistics> getColumnStatistics(
4250
const TypePtr type,
@@ -61,6 +69,13 @@ class ColumnChunkMetaDataPtr {
6169
/// Must check for its presence using hasDictionaryPageOffset().
6270
int64_t dictionaryPageOffset() const;
6371

72+
/// Check if bloom filter is available.
73+
bool hasBloomFilter() const;
74+
75+
/// The bloom filter offset.
76+
/// Must check for its presence using hasBloomFilter().
77+
std::optional<int64_t> bloom_filter_offset() const;
78+
6479
/// The compression.
6580
common::CompressionKind compression() const;
6681

velox/dwio/parquet/reader/PageReader.h

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,31 @@ class PageReader {
149149
return sessionTimezone_;
150150
}
151151

152+
// Access the loaded dictionary for filtering purposes
153+
const dwio::common::DictionaryValues& dictionary() const {
154+
return dictionary_;
155+
}
156+
157+
// Starts iterating over 'rows', which may span multiple pages. 'rows' are
158+
// relative to current position, with 0 meaning the first
159+
// unprocessed value in the current page, i.e. the row after the
160+
// last row touched on a previous call to skip() or
161+
// readWithVisitor(). This is the first row of the first data page
162+
// if first call.
163+
void startVisit(folly::Range<const vector_size_t*> rows);
164+
165+
// Seeks to page containing 'row'. Returns the number of rows to skip on
166+
// the page to get to 'row'. Clears the state and positions the stream and
167+
// initializes a decoder for the found page. row kRepDefOnly means
168+
// getting repdefs for the next page. If non-top level column, 'row'
169+
// is interpreted in terms of leaf rows, including leaf
170+
// nulls. Seeking ahead of pages covered by decodeRepDefs is not
171+
// allowed for non-top level columns.
172+
void seekToPage(int64_t row);
173+
174+
// Prepares dictionary from a dictionary page header
175+
void prepareDictionary(const thrift::PageHeader& pageHeader);
176+
152177
private:
153178
// Indicates that we only want the repdefs for the next page. Used when
154179
// prereading repdefs with seekToPage.
@@ -174,15 +199,6 @@ class PageReader {
174199
// 'pageData_' + 'encodedDataSize_'.
175200
void makedecoder();
176201

177-
// Reads and skips pages until finding a data page that contains
178-
// 'row'. Reads and sets 'rowOfPage_' and 'numRowsInPage_' and
179-
// initializes a decoder for the found page. row kRepDefOnly means
180-
// getting repdefs for the next page. If non-top level column, 'row'
181-
// is interpreted in terms of leaf rows, including leaf
182-
// nulls. Seeking ahead of pages covered by decodeRepDefs is not
183-
// allowed for non-top level columns.
184-
void seekToPage(int64_t row);
185-
186202
// Preloads the repdefs for the column chunk. To avoid preloading,
187203
// would need a way too clone the input stream so that one stream
188204
// reads ahead for repdefs and the other tracks the data. This is
@@ -202,7 +218,6 @@ class PageReader {
202218

203219
void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row);
204220
void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row);
205-
void prepareDictionary(const thrift::PageHeader& pageHeader);
206221
void makeDecoder();
207222

208223
// For a non-top level leaf, reads the defs and sets 'leafNulls_' and
@@ -230,14 +245,6 @@ class PageReader {
230245
return data;
231246
}
232247

233-
// Starts iterating over 'rows', which may span multiple pages. 'rows' are
234-
// relative to current position, with 0 meaning the first
235-
// unprocessed value in the current page, i.e. the row after the
236-
// last row touched on a previous call to skip() or
237-
// readWithVisitor(). This is the first row of the first data page
238-
// if first call.
239-
void startVisit(folly::Range<const vector_size_t*> rows);
240-
241248
// Seeks to the next page in a range given by startVisit(). Returns
242249
// true if there are unprocessed rows in the set given to
243250
// startVisit(). Seeks 'this' to the appropriate page and sets
@@ -318,6 +325,7 @@ class PageReader {
318325
}
319326
} else {
320327
if (isDictionary()) {
328+
// LOG(DEBUG) << "PageReader: Using StringDictionaryColumnVisitor for row filtering (without nulls) - RowGroup:" << rowGroupIndex_ << " PageOrdinal:" << pageOrdinal_ << " PageIndex:" << pageIndex_;
321329
auto dictVisitor = visitor.toStringDictionaryColumnVisitor();
322330
dictionaryIdDecoder_->readWithVisitor<false>(nullptr, dictVisitor);
323331
} else if (encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY) {

0 commit comments

Comments
 (0)