From af2ad2cd68b9c15e5238b466ba12df2114998e28 Mon Sep 17 00:00:00 2001 From: xinyi-xs Date: Fri, 19 Jul 2024 10:38:08 +0800 Subject: [PATCH] * NEW [parquet] add fix for parquet read. Signed-off-by: xinyi-xs --- src/supplemental/nanolib/parquet/parquet.cc | 101 ++++++++++++-------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/src/supplemental/nanolib/parquet/parquet.cc b/src/supplemental/nanolib/parquet/parquet.cc index f455e7c2b..b9634abd6 100644 --- a/src/supplemental/nanolib/parquet/parquet.cc +++ b/src/supplemental/nanolib/parquet/parquet.cc @@ -996,14 +996,14 @@ get_keys_indexes_fuzing( int index = 0; bool found = false; - log_error("start_key: %lu, end_key: %lu", start_key, end_key); + log_debug("start_key: %lu, end_key: %lu", start_key, end_key); while (int64_reader->HasNext()) { int64_t value; rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, &value, &values_read); if (1 == rows_read && 1 == values_read) { - log_error("read value: %lu", value); + log_trace("read value: %lu", value); if (((uint64_t) value) >= start_key) { index_vector.push_back(index++); found = true; @@ -1022,7 +1022,7 @@ get_keys_indexes_fuzing( rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, &value, &values_read); - log_error("read value: %lu", value); + log_trace("read value: %lu", value); if (1 == rows_read && 1 == values_read) { if (((uint64_t) value) > end_key) { index_vector.push_back(index - 1); @@ -1285,8 +1285,8 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2]) std::shared_ptr row_group_reader = parquet_reader->RowGroup( r); // Get the RowGroup Reader - int64_t values_read = 0; - int64_t rows_read = 0; + int64_t values_read = 0; + int64_t rows_read = 0; std::shared_ptr column_reader; column_reader = row_group_reader->Column(0); @@ -1297,7 +1297,7 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2]) index_vector = get_keys_indexes_fuzing( int64_reader, keys[0], keys[1]); if (-1 == index_vector[0] || -1 == index_vector[1]) { - log_error("Not found data in key"); + log_error("Not found data in key"); ret_vec.push_back(NULL); return ret_vec; } @@ -1306,7 +1306,8 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2]) auto ba_reader = dynamic_pointer_cast( column_reader); - log_error("start index: %lu, end index: %lu", index_vector[0], index_vector[1]); + log_debug("start index: %lu, end index: %lu", + index_vector[0], index_vector[1]); if (ba_reader->HasNext()) { ba_reader->Skip(index_vector[0]); @@ -1315,46 +1316,66 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2]) if (ba_reader->HasNext()) { int64_t batch_size = index_vector[1] - index_vector[0] + 1; - std::vector values( - batch_size); - std::vector definition_levels(batch_size); // Use a vector for definition levels - parquet::ByteArray value; - rows_read = ba_reader->ReadBatch(batch_size, - definition_levels.data(), nullptr, values.data(), - &values_read); - log_error("batch_size: %lu, rows_read: %lu, values_read: %lu", batch_size, rows_read, values_read); - if (batch_size == rows_read && - batch_size == values_read) { - for (int64_t b = 0; b < batch_size; - b++) { + int64_t total_values_read = 0; + while (total_values_read < batch_size) { + std::vector values( + batch_size); + std::vector definition_levels( + batch_size); // Use a vector for + // definition levels + parquet::ByteArray value; + rows_read = ba_reader->ReadBatch( + batch_size, + definition_levels.data(), nullptr, + values.data(), &values_read); + total_values_read += rows_read; + log_info( + "batch_size: %lu, " + "total_values_read: %lu, " + "rows_read: %lu, values_read: %lu", + batch_size, total_values_read, + rows_read, values_read); + for (int64_t r = 0; r < rows_read; + r++) { parquet_data_packet *pack = (parquet_data_packet *) malloc(sizeof( parquet_data_packet)); if (!pack) { - log_error("Memory allocation failed for parquet_data_packet"); - for (auto p : ret_vec) { + log_error( + "Memory " + "allocation " + "failed " + "for " + "parquet_data_" + "packet"); + for (auto p : + ret_vec) { free(p->data); free(p); } - return vector(); + return vector< + parquet_data_packet + *>(); } pack->data = (uint8_t *) malloc( - values[b].len * + values[r].len * sizeof(uint8_t)); memcpy(pack->data, - values[b].ptr, - values[b].len); - pack->size = values[b].len; - log_error("push element"); + values[r].ptr, + values[r].len); + pack->size = values[r].len; + log_trace("push element"); ret_vec.push_back(pack); } + if (batch_size == total_values_read) { + break; + } } } else { - log_error("Next is NULL"); - } - + log_error("Next is NULL"); + } } } catch (const std::exception &e) { @@ -1390,25 +1411,26 @@ get_key(const char *filename, key_type type) } parquet_data_packet ** -parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size, char *topic) +parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, + uint64_t end_key, uint32_t *size, char *topic) { vector ret_vec; parquet_data_packet **packets = NULL; uint32_t len = 0; - log_error("start_key: %lu, end_key: %lu", start_key, end_key); + log_info("start_key: %lu, end_key: %lu", start_key, end_key); const char **filenames = parquet_find_span(start_key, end_key, &len); for (uint32_t i = 0; i < len; i++) { - log_info("name: %s, topic: %s", filenames[i], topic); + log_info("name: %s, topic: %s", filenames[i], topic); if (strstr(filenames[i], topic) == NULL) { - nng_strfree((char*)filenames[i]); + nng_strfree((char *) filenames[i]); continue; } uint64_t keys[2]; - keys[0] = start_key; - keys[1] = end_key; + keys[0] = start_key; + keys[1] = end_key; if (len > 1) { keys[0] = i == 0 ? start_key : get_key(filenames[i], START_KEY); @@ -1417,12 +1439,13 @@ parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t : get_key(filenames[i], END_KEY); } - log_error("file start_key: %lu, file end_key: %lu", keys[0], keys[1]); + log_debug("file start_key: %lu, file end_key: %lu", keys[0], + keys[1]); auto tmp = parquet_read_span(conf, filenames[i], keys); - log_error("read span size: %ld", tmp.size()); + log_debug("read span size: %ld", tmp.size()); ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end()); - nng_strfree((char*)filenames[i]); + nng_strfree((char *) filenames[i]); } nng_free(filenames, len);