@@ -28,8 +28,8 @@ namespace facebook::velox::parquet {
28
28
std::unique_ptr<dwio::common::FormatData> ParquetParams::toFormatData (
29
29
const std::shared_ptr<const dwio::common::TypeWithId>& type,
30
30
const common::ScanSpec& /* scanSpec*/ ) {
31
- auto parquetData = std::make_unique<ParquetData>(
32
- type, metaData_, pool (), sessionTimezone_);
31
+ auto parquetData =
32
+ std::make_unique<ParquetData>( type, metaData_, pool (), sessionTimezone_);
33
33
// Set the BufferedInput if available
34
34
if (bufferedInput_) {
35
35
parquetData->setBufferedInput (bufferedInput_);
@@ -95,27 +95,27 @@ bool ParquetData::rowGroupMatches(
95
95
if (columnChunk.hasStatistics ()) {
96
96
auto columnStats =
97
97
columnChunk.getColumnStatistics (type, rowGroup.numRows ());
98
- bool statisticsResult = testFilter (filter, columnStats.get (), rowGroup.numRows (), type);
98
+ bool statisticsResult =
99
+ testFilter (filter, columnStats.get (), rowGroup.numRows (), type);
99
100
if (!statisticsResult) {
100
101
return false ;
101
102
}
102
103
}
103
-
104
-
105
- bool canUseDictionaryFiltering = isOnlyDictionaryEncodingPagesImpl (columnChunk);
106
- if (canUseDictionaryFiltering) {
107
- bool dictionaryResult = testFilterAgainstDictionary (rowGroupId, filter, columnChunk);
108
- if (!dictionaryResult) {
109
- return false ;
110
- }
104
+ bool canUseDictionaryFiltering =
105
+ isOnlyDictionaryEncodingPagesImpl (columnChunk);
106
+ if (canUseDictionaryFiltering) {
107
+ bool dictionaryResult =
108
+ testFilterAgainstDictionary (rowGroupId, filter, columnChunk);
109
+ if (!dictionaryResult) {
110
+ return false ;
111
111
}
112
- return true ;
112
+ }
113
+ return true ;
113
114
}
114
115
115
116
void ParquetData::enqueueRowGroup (
116
117
uint32_t index,
117
118
dwio::common::BufferedInput& input) {
118
- // Store the BufferedInput reference for creating streams on demand
119
119
bufferedInput_ = &input;
120
120
121
121
auto chunk = fileMetaDataPtr_.rowGroup (index).columnChunk (type_->column ());
@@ -175,9 +175,11 @@ std::pair<int64_t, int64_t> ParquetData::getRowGroupRegion(
175
175
}
176
176
177
177
// Presto's exact isOnlyDictionaryEncodingPages function from PR #4779
178
- bool ParquetData::isOnlyDictionaryEncodingPagesImpl (const ColumnChunkMetaDataPtr& columnChunk) {
179
- // Files written with newer versions of Parquet libraries (e.g. parquet-mr 1.9.0) will have EncodingStats available
180
- // Otherwise, fallback to v1 logic
178
+ bool ParquetData::isOnlyDictionaryEncodingPagesImpl (
179
+ const ColumnChunkMetaDataPtr& columnChunk) {
180
+ // Files written with newer versions of Parquet libraries (e.g.
181
+ // parquet-mr 1.9.0) will have EncodingStats available Otherwise, fallback to
182
+ // v1 logic
181
183
182
184
// Check for EncodingStats when available (newer Parquet files)
183
185
if (columnChunk.hasEncodingStats ()) {
@@ -187,19 +189,22 @@ bool ParquetData::isOnlyDictionaryEncodingPagesImpl(const ColumnChunkMetaDataPtr
187
189
188
190
// Fallback to v1 logic
189
191
auto encodings = columnChunk.getEncoding ();
190
- std::set<thrift::Encoding::type> encodingSet (encodings.begin (), encodings.end ());
192
+ std::set<thrift::Encoding::type> encodingSet (
193
+ encodings.begin (), encodings.end ());
191
194
192
195
if (encodingSet.count (thrift::Encoding::PLAIN_DICTIONARY)) {
193
196
// PLAIN_DICTIONARY was present, which means at least one page was
194
197
// dictionary-encoded and 1.0 encodings are used
195
- // The only other allowed encodings are RLE and BIT_PACKED which are used for repetition or definition levels
198
+ // The only other allowed encodings are RLE and BIT_PACKED which are used
199
+ // for repetition or definition levels
196
200
std::set<thrift::Encoding::type> allowedEncodings = {
197
- thrift::Encoding::PLAIN_DICTIONARY,
198
- thrift::Encoding::RLE, // For repetition/definition levels
199
- thrift::Encoding::BIT_PACKED // For repetition/definition levels
201
+ thrift::Encoding::PLAIN_DICTIONARY,
202
+ thrift::Encoding::RLE, // For repetition/definition levels
203
+ thrift::Encoding::BIT_PACKED // For repetition/definition levels
200
204
};
201
205
202
- // Check if there are any disallowed encodings (equivalent to Sets.difference in Java)
206
+ // Check if there are any disallowed encodings (equivalent to
207
+ // Sets.difference in Java)
203
208
for (const auto & encoding : encodings) {
204
209
if (allowedEncodings.find (encoding) == allowedEncodings.end ()) {
205
210
return false ;
@@ -212,7 +217,8 @@ bool ParquetData::isOnlyDictionaryEncodingPagesImpl(const ColumnChunkMetaDataPtr
212
217
}
213
218
214
219
// Helper methods for EncodingStats analysis (like Java Presto)
215
- bool ParquetData::hasDictionaryPages (const std::vector<thrift::PageEncodingStats>& stats) {
220
+ bool ParquetData::hasDictionaryPages (
221
+ const std::vector<thrift::PageEncodingStats>& stats) {
216
222
for (const auto & pageStats : stats) {
217
223
if (pageStats.page_type == thrift::PageType::DICTIONARY_PAGE) {
218
224
return true ;
@@ -221,7 +227,8 @@ bool ParquetData::hasDictionaryPages(const std::vector<thrift::PageEncodingStats
221
227
return false ;
222
228
}
223
229
224
- bool ParquetData::hasNonDictionaryEncodedPages (const std::vector<thrift::PageEncodingStats>& stats) {
230
+ bool ParquetData::hasNonDictionaryEncodedPages (
231
+ const std::vector<thrift::PageEncodingStats>& stats) {
225
232
for (const auto & pageStats : stats) {
226
233
if (pageStats.page_type == thrift::PageType::DATA_PAGE ||
227
234
pageStats.page_type == thrift::PageType::DATA_PAGE_V2) {
@@ -239,17 +246,6 @@ bool ParquetData::testFilterAgainstDictionary(
239
246
uint32_t rowGroupId,
240
247
const common::Filter* filter,
241
248
const ColumnChunkMetaDataPtr& columnChunk) {
242
- if (!filter) {
243
- VLOG (3 ) << " [DICT-TEST] No filter provided, dictionary test passes" ;
244
- }
245
-
246
- // Log file path for debugging
247
- std::string filePath = " unknown" ;
248
- if (bufferedInput_ && bufferedInput_->getReadFile ()) {
249
- filePath = bufferedInput_->getReadFile ()->getName ();
250
- }
251
-
252
- // Special handling for IsNull filters - conservative include before dictionary parsing
253
249
if (filter->kind () == common::FilterKind::kIsNull ) {
254
250
return true ; // Conservative include for IsNull filters
255
251
}
@@ -259,48 +255,54 @@ bool ParquetData::testFilterAgainstDictionary(
259
255
return true ;
260
256
}
261
257
262
- // Use the dictionary directly - no temp storage needed
263
258
auto numValues = dictionaryPtr->numValues ;
264
259
const void * dictPtr = dictionaryPtr->values ->as <void >();
265
260
266
261
// Test if any dictionary value passes the filter
267
262
auto testDict = [&]<typename T>() {
268
263
const T* dict = reinterpret_cast <const T*>(dictPtr);
269
264
270
- // For larger dictionaries, we could use SIMD testValues() for better performance
271
- // For now, use simple scalar approach which is sufficient for typical dictionary sizes
265
+ // For larger dictionaries, we could use SIMD testValues() for better
266
+ // performance For now, use simple scalar approach which is sufficient for
267
+ // typical dictionary sizes
272
268
for (int32_t i = 0 ; i < numValues; ++i) {
273
- if (common::applyFilter (*filter, dict[i])) return true ;
269
+ if (common::applyFilter (*filter, dict[i]))
270
+ return true ;
274
271
}
275
272
return false ;
276
273
};
277
274
278
275
bool anyValuePasses = [&] {
279
276
switch (type_->type ()->kind ()) {
280
- case TypeKind::BIGINT: return testDict.operator ()<int64_t >();
281
- case TypeKind::INTEGER: return testDict.operator ()<int32_t >();
277
+ case TypeKind::BIGINT:
278
+ return testDict.operator ()<int64_t >();
279
+ case TypeKind::INTEGER:
280
+ return testDict.operator ()<int32_t >();
282
281
case TypeKind::VARCHAR:
283
- case TypeKind::VARBINARY: return testDict.operator ()<StringView>();
284
- case TypeKind::REAL: return testDict.operator ()<float >();
285
- case TypeKind::DOUBLE: return testDict.operator ()<double >();
286
- case TypeKind::BOOLEAN: return testDict.operator ()<bool >();
287
- default : return true ; // Conservative fallback
282
+ case TypeKind::VARBINARY:
283
+ return testDict.operator ()<StringView>();
284
+ case TypeKind::REAL:
285
+ return testDict.operator ()<float >();
286
+ case TypeKind::DOUBLE:
287
+ return testDict.operator ()<double >();
288
+ case TypeKind::BOOLEAN:
289
+ return testDict.operator ()<bool >();
290
+ default :
291
+ return true ; // Conservative fallback
288
292
}
289
293
}();
290
-
291
- // If no dictionary values pass the filter, but the filter accepts NULLs,
292
- // we must conservatively include because the row group might contain NULLs that would match
293
294
if (!anyValuePasses && filter->testNull ()) {
294
295
anyValuePasses = true ;
295
296
}
296
297
return anyValuePasses;
297
298
}
298
299
299
- // Read dictionary page directly for row group filtering (like Presto's dictionaryPredicatesMatch)
300
- std::unique_ptr<dwio::common::DictionaryValues> ParquetData::readDictionaryPageForFiltering (
300
+ // Read dictionary page directly for row group filtering (like Presto's
301
+ // dictionaryPredicatesMatch)
302
+ std::unique_ptr<dwio::common::DictionaryValues>
303
+ ParquetData::readDictionaryPageForFiltering (
301
304
uint32_t rowGroupId,
302
305
const ColumnChunkMetaDataPtr& columnChunk) {
303
-
304
306
// Create input stream for the column chunk
305
307
auto inputStream = getInputStream (rowGroupId, columnChunk);
306
308
if (!inputStream) {
@@ -333,7 +335,8 @@ std::unique_ptr<dwio::common::DictionaryValues> ParquetData::readDictionaryPageF
333
335
}
334
336
// Helper method to get input stream for column chunk
335
337
std::unique_ptr<dwio::common::SeekableInputStream> ParquetData::getInputStream (
336
- uint32_t rowGroupId, const ColumnChunkMetaDataPtr& columnChunk) {
338
+ uint32_t rowGroupId,
339
+ const ColumnChunkMetaDataPtr& columnChunk) {
337
340
// Use existing stream if available
338
341
if (rowGroupId < streams_.size () && streams_[rowGroupId]) {
339
342
return std::move (streams_[rowGroupId]);
@@ -346,12 +349,13 @@ std::unique_ptr<dwio::common::SeekableInputStream> ParquetData::getInputStream(
346
349
347
350
// Calculate read parameters (same as enqueueRowGroup)
348
351
uint64_t chunkReadOffset = columnChunk.dataPageOffset ();
349
- if (columnChunk.hasDictionaryPageOffset () && columnChunk.dictionaryPageOffset () >= 4 ) {
352
+ if (columnChunk.hasDictionaryPageOffset () &&
353
+ columnChunk.dictionaryPageOffset () >= 4 ) {
350
354
chunkReadOffset = columnChunk.dictionaryPageOffset ();
351
355
}
352
356
353
- uint64_t readSize =
354
- (columnChunk. compression () == common::CompressionKind::CompressionKind_NONE)
357
+ uint64_t readSize = (columnChunk. compression () ==
358
+ common::CompressionKind::CompressionKind_NONE)
355
359
? columnChunk.totalUncompressedSize ()
356
360
: columnChunk.totalCompressedSize ();
357
361
0 commit comments