diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 4e3d8169ce..898542daaf 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -66,6 +66,7 @@ struct RecordBatchGroupWriter { request_id: RequestId, hybrid_encoding: bool, input: RecordBatchStream, + input_exhausted: bool, meta_data: MetaData, num_rows_per_row_group: usize, max_buffer_size: usize, @@ -110,6 +111,10 @@ impl RecordBatchGroupWriter { continue; } + if self.input_exhausted { + break; + } + // Previous record batch has been exhausted, and let's fetch next record batch. match self.input.next().await { Some(v) => { @@ -124,7 +129,10 @@ impl RecordBatchGroupWriter { // fill `curr_row_group`. prev_record_batch.replace(v); } - None => break, + None => { + self.input_exhausted = true; + break; + } }; } @@ -271,6 +279,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { hybrid_encoding: self.hybrid_encoding, request_id, input, + input_exhausted: false, num_rows_per_row_group: self.num_rows_per_row_group, max_buffer_size: self.max_buffer_size, compression: self.compression, @@ -521,6 +530,7 @@ mod tests { request_id: RequestId::next_id(), hybrid_encoding: false, input: record_batch_stream, + input_exhausted: false, num_rows_per_row_group, compression: Compression::UNCOMPRESSED, meta_data: MetaData {