Skip to content

Commit

Permalink
Parquet detect: CHECK fails (#7551)
Browse files Browse the repository at this point in the history
* Fix issue: \detect parquet fails in `heavysql` for array types.

* Correct a bug where arrays were incorrectly counted in detect.

* Parquet detect now uses 0 for the value of comp param to be compatible
with `heavysql` expectations.

Signed-off-by: Misiu Godfrey <misiu.godfrey@kraken.mapd.com>
  • Loading branch information
mattgara authored and misiugodfrey committed Aug 26, 2024
1 parent 4a30ab1 commit 5d86d06
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
29 changes: 21 additions & 8 deletions DataMgr/ForeignStorage/LazyParquetChunkLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,10 @@ SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor* parquet_colu
SQLTypeInfo type;
type.set_type(kTEXT);
type.set_compression(kENCODING_DICT);
type.set_comp_param(32);
type.set_comp_param(0); // `comp_param` is expected either to be zero or
// equal to a string dictionary id in some code
// paths, since we don't have a string dictionary we
// set this to zero
type.set_fixed_size();
return type;
}
Expand Down Expand Up @@ -1829,7 +1832,7 @@ std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroup
StringDictionary* string_dictionary,
RejectedRowIndices* rejected_row_indices,
const bool is_for_detect,
const std::optional<int64_t> max_levels_read) {
const std::optional<int64_t> max_rows_to_read) {
auto timer = DEBUG_TIMER(__func__);
std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
// `def_levels` and `rep_levels` below are used to store the read definition
Expand Down Expand Up @@ -1866,7 +1869,7 @@ std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroup
encoder->initializeColumnType(column_descriptor->columnType);

bool early_exit = false;
int64_t total_levels_read = 0;
int64_t total_rows_read = 0;
for (const auto& row_group_interval : row_group_intervals) {
const auto& file_path = row_group_interval.file_path;
auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
Expand Down Expand Up @@ -1929,9 +1932,19 @@ std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroup
values.data());
}

if (max_levels_read.has_value()) {
total_levels_read += levels_read;
if (total_levels_read >= max_levels_read.value()) {
if (max_rows_to_read.has_value()) {
if (column_descriptor->columnType.is_array()) {
auto array_encoder =
dynamic_cast<ParquetArrayDetectEncoder*>(encoder.get());
CHECK(array_encoder);
total_rows_read = array_encoder->getArraysCount();
} else {
// For scalar types it is safe to assume the number of levels read is equal
// to the number of rows read
total_rows_read += levels_read;
}

if (total_rows_read >= max_rows_to_read.value()) {
early_exit = true;
break;
}
Expand Down Expand Up @@ -1959,11 +1972,11 @@ std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroup
", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
"', Parquet file: '" + file_path + "'");
}
if (max_levels_read.has_value() && early_exit) {
if (max_rows_to_read.has_value() && early_exit) {
break;
}
}
if (max_levels_read.has_value() && early_exit) {
if (max_rows_to_read.has_value() && early_exit) {
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions DataMgr/ForeignStorage/ParquetArrayDetectEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ParquetArrayDetectEncoder : public ParquetArrayEncoder {
updateMetadataForAppendedArrayItem(encoded_index);
}

size_t getArraysCount() const { return detect_buffer_->getStrings().size(); }

protected:
void encodeAllValues(const int8_t* values, const int64_t values_read) override {
if (!is_string_array_) {
Expand Down

0 comments on commit 5d86d06

Please sign in to comment.