Skip to content

Commit

Permalink
chore: stop poll after input stream is exhausted in sst writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Mar 24, 2023
1 parent d46717c commit 6585f8c
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct RecordBatchGroupWriter {
request_id: RequestId,
hybrid_encoding: bool,
input: RecordBatchStream,
input_exhausted: bool,
meta_data: MetaData,
num_rows_per_row_group: usize,
max_buffer_size: usize,
Expand Down Expand Up @@ -110,6 +111,10 @@ impl RecordBatchGroupWriter {
continue;
}

if self.input_exhausted {
break;
}

// Previous record batch has been exhausted, and let's fetch next record batch.
match self.input.next().await {
Some(v) => {
Expand All @@ -124,7 +129,10 @@ impl RecordBatchGroupWriter {
// fill `curr_row_group`.
prev_record_batch.replace(v);
}
None => break,
None => {
self.input_exhausted = true;
break;
}
};
}

Expand Down Expand Up @@ -271,6 +279,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
hybrid_encoding: self.hybrid_encoding,
request_id,
input,
input_exhausted: false,
num_rows_per_row_group: self.num_rows_per_row_group,
max_buffer_size: self.max_buffer_size,
compression: self.compression,
Expand Down Expand Up @@ -521,6 +530,7 @@ mod tests {
request_id: RequestId::next_id(),
hybrid_encoding: false,
input: record_batch_stream,
input_exhausted: false,
num_rows_per_row_group,
compression: Compression::UNCOMPRESSED,
meta_data: MetaData {
Expand Down

0 comments on commit 6585f8c

Please sign in to comment.