@@ -61,7 +61,7 @@ mod store;
6161pub use store:: * ;
6262
6363use crate :: {
64- arrow:: arrow_writer:: { ArrowColumnChunk , ArrowColumnWriter , ArrowWriterOptions } ,
64+ arrow:: arrow_writer:: ArrowWriterOptions ,
6565 arrow:: ArrowWriter ,
6666 errors:: { ParquetError , Result } ,
6767 file:: { metadata:: RowGroupMetaData , properties:: WriterProperties } ,
@@ -288,42 +288,17 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
288288
289289 Ok ( ( ) )
290290 }
291-
292- /// Create a new row group writer and return its column writers.
293- pub async fn get_column_writers ( & mut self ) -> Result < Vec < ArrowColumnWriter > > {
294- let before = self . sync_writer . flushed_row_groups ( ) . len ( ) ;
295- // TODO: should use the new API
296- #[ allow( deprecated) ]
297- let writers = self . sync_writer . get_column_writers ( ) ?;
298- // let (serialized_file_writer, arrow_row_group_writer_factory) =
299- // self.sync_writer.into_serialized_writer().unwrap();
300- // let writers = row_group_writer_factory.create_column_writers(0).unwrap();
301- // let metadata = serialized_file_writer.close().unwrap();
302- if before != self . sync_writer . flushed_row_groups ( ) . len ( ) {
303- self . do_write ( ) . await ?;
304- }
305- Ok ( writers)
306- }
307-
308- /// Append the given column chunks to the file as a new row group.
309- pub async fn append_row_group ( & mut self , chunks : Vec < ArrowColumnChunk > ) -> Result < ( ) > {
310- // TODO: should use the new API
311- #[ allow( deprecated) ]
312- self . sync_writer . append_row_group ( chunks) ?;
313- self . do_write ( ) . await
314- }
315291}
316292
317293#[ cfg( test) ]
318294mod tests {
295+ use crate :: arrow:: arrow_reader:: { ParquetRecordBatchReader , ParquetRecordBatchReaderBuilder } ;
296+ use crate :: arrow:: arrow_writer:: compute_leaves;
319297 use arrow:: datatypes:: { DataType , Field , Schema } ;
320298 use arrow_array:: { ArrayRef , BinaryArray , Int32Array , Int64Array , RecordBatchReader } ;
321299 use bytes:: Bytes ;
322300 use std:: sync:: Arc ;
323301
324- use crate :: arrow:: arrow_reader:: { ParquetRecordBatchReader , ParquetRecordBatchReaderBuilder } ;
325- use crate :: arrow:: arrow_writer:: compute_leaves;
326-
327302 use super :: * ;
328303
329304 fn get_test_reader ( ) -> ParquetRecordBatchReader {
@@ -369,7 +344,13 @@ mod tests {
369344 // Use classic API
370345 writer. write ( & to_write_record) . await . unwrap ( ) ;
371346
372- let mut writers = writer. get_column_writers ( ) . await . unwrap ( ) ;
347+ // Use low-level API to write an Arrow group
348+ let arrow_writer = writer. sync_writer ;
349+ let ( mut serialized_file_writer, row_group_writer_factory) =
350+ arrow_writer. into_serialized_writer ( ) . unwrap ( ) ;
351+
352+ // Get column writers
353+ let mut writers = row_group_writer_factory. create_column_writers ( 0 ) . unwrap ( ) ;
373354 let col = Arc :: new ( Int64Array :: from_iter_values ( [ 1 , 2 , 3 ] ) ) as ArrayRef ;
374355 let to_write_arrow_group = RecordBatch :: try_from_iter ( [ ( "col" , col) ] ) . unwrap ( ) ;
375356
@@ -384,10 +365,13 @@ mod tests {
384365 }
385366 }
386367
387- let columns: Vec < _ > = writers. into_iter ( ) . map ( |w| w. close ( ) . unwrap ( ) ) . collect ( ) ;
368+ let mut columns: Vec < _ > = writers. into_iter ( ) . map ( |w| w. close ( ) . unwrap ( ) ) . collect ( ) ;
388369 // Append the arrow group as a new row group. Flush in progress
389- writer. append_row_group ( columns) . await . unwrap ( ) ;
390- writer. close ( ) . await . unwrap ( ) ;
370+ let mut row_group_writer = serialized_file_writer. next_row_group ( ) . unwrap ( ) ;
371+ let chunk = columns. remove ( 0 ) ;
372+ chunk. append_to_row_group ( & mut row_group_writer) . unwrap ( ) ;
373+ row_group_writer. close ( ) . unwrap ( ) ;
374+ let _metadata = serialized_file_writer. close ( ) . unwrap ( ) ;
391375
392376 let buffer = Bytes :: from ( buffer) ;
393377 let mut reader = ParquetRecordBatchReaderBuilder :: try_new ( buffer)
0 commit comments