Skip to content

Commit

Permalink
* NEW [parquet] add fix for parquet read.
Browse files Browse the repository at this point in the history
Signed-off-by: xinyi-xs <lihj@emqx.io>
  • Loading branch information
xinyi-xs committed Jul 19, 2024
1 parent df7f473 commit af2ad2c
Showing 1 changed file with 62 additions and 39 deletions.
101 changes: 62 additions & 39 deletions src/supplemental/nanolib/parquet/parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,14 @@ get_keys_indexes_fuzing(
int index = 0;
bool found = false;

log_error("start_key: %lu, end_key: %lu", start_key, end_key);
log_debug("start_key: %lu, end_key: %lu", start_key, end_key);

while (int64_reader->HasNext()) {
int64_t value;
rows_read = int64_reader->ReadBatch(1, &definition_level,
&repetition_level, &value, &values_read);
if (1 == rows_read && 1 == values_read) {
log_error("read value: %lu", value);
log_trace("read value: %lu", value);
if (((uint64_t) value) >= start_key) {
index_vector.push_back(index++);
found = true;
Expand All @@ -1022,7 +1022,7 @@ get_keys_indexes_fuzing(
rows_read =
int64_reader->ReadBatch(1, &definition_level,
&repetition_level, &value, &values_read);
log_error("read value: %lu", value);
log_trace("read value: %lu", value);
if (1 == rows_read && 1 == values_read) {
if (((uint64_t) value) > end_key) {
index_vector.push_back(index - 1);
Expand Down Expand Up @@ -1285,8 +1285,8 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
std::shared_ptr<parquet::RowGroupReader>
row_group_reader = parquet_reader->RowGroup(
r); // Get the RowGroup Reader
int64_t values_read = 0;
int64_t rows_read = 0;
int64_t values_read = 0;
int64_t rows_read = 0;
std::shared_ptr<parquet::ColumnReader> column_reader;

column_reader = row_group_reader->Column(0);
Expand All @@ -1297,7 +1297,7 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
index_vector = get_keys_indexes_fuzing(
int64_reader, keys[0], keys[1]);
if (-1 == index_vector[0] || -1 == index_vector[1]) {
log_error("Not found data in key");
log_error("Not found data in key");
ret_vec.push_back(NULL);
return ret_vec;
}
Expand All @@ -1306,7 +1306,8 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
auto ba_reader =
dynamic_pointer_cast<parquet::ByteArrayReader>(
column_reader);
log_error("start index: %lu, end index: %lu", index_vector[0], index_vector[1]);
log_debug("start index: %lu, end index: %lu",
index_vector[0], index_vector[1]);

if (ba_reader->HasNext()) {
ba_reader->Skip(index_vector[0]);
Expand All @@ -1315,46 +1316,66 @@ parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
if (ba_reader->HasNext()) {
int64_t batch_size =
index_vector[1] - index_vector[0] + 1;
std::vector<parquet::ByteArray> values(
batch_size);
std::vector<int16_t> definition_levels(batch_size); // Use a vector for definition levels
parquet::ByteArray value;
rows_read = ba_reader->ReadBatch(batch_size,
definition_levels.data(), nullptr, values.data(),
&values_read);
log_error("batch_size: %lu, rows_read: %lu, values_read: %lu", batch_size, rows_read, values_read);
if (batch_size == rows_read &&
batch_size == values_read) {
for (int64_t b = 0; b < batch_size;
b++) {
int64_t total_values_read = 0;
while (total_values_read < batch_size) {
std::vector<parquet::ByteArray> values(
batch_size);
std::vector<int16_t> definition_levels(
batch_size); // Use a vector for
// definition levels
parquet::ByteArray value;
rows_read = ba_reader->ReadBatch(
batch_size,
definition_levels.data(), nullptr,
values.data(), &values_read);
total_values_read += rows_read;
log_info(
"batch_size: %lu, "
"total_values_read: %lu, "
"rows_read: %lu, values_read: %lu",
batch_size, total_values_read,
rows_read, values_read);
for (int64_t r = 0; r < rows_read;
r++) {
parquet_data_packet *pack =
(parquet_data_packet *)
malloc(sizeof(
parquet_data_packet));
if (!pack) {
log_error("Memory allocation failed for parquet_data_packet");
for (auto p : ret_vec) {
log_error(
"Memory "
"allocation "
"failed "
"for "
"parquet_data_"
"packet");
for (auto p :
ret_vec) {
free(p->data);
free(p);
}
return vector<parquet_data_packet *>();
return vector<
parquet_data_packet
*>();
}
pack->data =
(uint8_t *) malloc(
values[b].len *
values[r].len *
sizeof(uint8_t));
memcpy(pack->data,
values[b].ptr,
values[b].len);
pack->size = values[b].len;
log_error("push element");
values[r].ptr,
values[r].len);
pack->size = values[r].len;
log_trace("push element");
ret_vec.push_back(pack);
}
if (batch_size == total_values_read) {
break;
}
}
} else {
log_error("Next is NULL");
}

log_error("Next is NULL");
}
}

} catch (const std::exception &e) {
Expand Down Expand Up @@ -1390,25 +1411,26 @@ get_key(const char *filename, key_type type)
}

parquet_data_packet **
parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size, char *topic)
parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key,
uint64_t end_key, uint32_t *size, char *topic)
{
vector<parquet_data_packet *> ret_vec;
parquet_data_packet **packets = NULL;
uint32_t len = 0;

log_error("start_key: %lu, end_key: %lu", start_key, end_key);
log_info("start_key: %lu, end_key: %lu", start_key, end_key);
const char **filenames = parquet_find_span(start_key, end_key, &len);

for (uint32_t i = 0; i < len; i++) {
log_info("name: %s, topic: %s", filenames[i], topic);
log_info("name: %s, topic: %s", filenames[i], topic);
if (strstr(filenames[i], topic) == NULL) {
nng_strfree((char*)filenames[i]);
nng_strfree((char *) filenames[i]);
continue;
}

uint64_t keys[2];
keys[0] = start_key;
keys[1] = end_key;
keys[0] = start_key;
keys[1] = end_key;
if (len > 1) {
keys[0] = i == 0 ? start_key
: get_key(filenames[i], START_KEY);
Expand All @@ -1417,12 +1439,13 @@ parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t
: get_key(filenames[i], END_KEY);
}

log_error("file start_key: %lu, file end_key: %lu", keys[0], keys[1]);
log_debug("file start_key: %lu, file end_key: %lu", keys[0],
keys[1]);

auto tmp = parquet_read_span(conf, filenames[i], keys);
log_error("read span size: %ld", tmp.size());
log_debug("read span size: %ld", tmp.size());
ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end());
nng_strfree((char*)filenames[i]);
nng_strfree((char *) filenames[i]);
}
nng_free(filenames, len);

Expand Down

0 comments on commit af2ad2c

Please sign in to comment.