| 
15 | 15 | // specific language governing permissions and limitations  | 
16 | 16 | // under the License.  | 
17 | 17 | 
 
  | 
18 |  | -//! Contains asynchronous APIs for reading parquet files into  | 
19 |  | -//! arrow [`RecordBatch`]  | 
 | 18 | +//! Provides `async` API for reading parquet files as  | 
 | 19 | +//! [`RecordBatch`]es  | 
 | 20 | +//!  | 
 | 21 | +//! ```  | 
 | 22 | +//! # #[tokio::main(flavor="current_thread")]  | 
 | 23 | +//! # async fn main() {  | 
 | 24 | +//! #  | 
 | 25 | +//! use arrow::record_batch::RecordBatch;  | 
 | 26 | +//! use arrow::util::pretty::pretty_format_batches;  | 
 | 27 | +//! use futures::TryStreamExt;  | 
 | 28 | +//! use tokio::fs::File;  | 
 | 29 | +//!  | 
 | 30 | +//! use parquet::arrow::ParquetRecordBatchStreamBuilder;  | 
 | 31 | +//!  | 
 | 32 | +//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {  | 
 | 33 | +//! #     let formatted = pretty_format_batches(batches).unwrap().to_string();  | 
 | 34 | +//! #     let actual_lines: Vec<_> = formatted.trim().lines().collect();  | 
 | 35 | +//! #     assert_eq!(  | 
 | 36 | +//! #          &actual_lines, expected_lines,  | 
 | 37 | +//! #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",  | 
 | 38 | +//! #          expected_lines, actual_lines  | 
 | 39 | +//! #      );  | 
 | 40 | +//! #  }  | 
 | 41 | +//!  | 
 | 42 | +//! let testdata = arrow::util::test_util::parquet_test_data();  | 
 | 43 | +//! let path = format!("{}/alltypes_plain.parquet", testdata);  | 
 | 44 | +//! let file = tokio::fs::File::open(path).await.unwrap();  | 
 | 45 | +//!  | 
 | 46 | +//! let builder = ParquetRecordBatchStreamBuilder::new(file)  | 
 | 47 | +//!     .await  | 
 | 48 | +//!     .unwrap()  | 
 | 49 | +//!     .with_projection(vec![1, 2, 6])  | 
 | 50 | +//!     .with_batch_size(3);  | 
 | 51 | +//!  | 
 | 52 | +//! let stream = builder.build().unwrap();  | 
 | 53 | +//!  | 
 | 54 | +//! let results = stream.try_collect::<Vec<_>>().await.unwrap();  | 
 | 55 | +//! assert_eq!(results.len(), 3);  | 
 | 56 | +//!  | 
 | 57 | +//! assert_batches_eq(  | 
 | 58 | +//!     &results,  | 
 | 59 | +//!     &[  | 
 | 60 | +//!         "+----------+-------------+-----------+",  | 
 | 61 | +//!         "| bool_col | tinyint_col | float_col |",  | 
 | 62 | +//!         "+----------+-------------+-----------+",  | 
 | 63 | +//!         "| true     | 0           | 0         |",  | 
 | 64 | +//!         "| false    | 1           | 1.1       |",  | 
 | 65 | +//!         "| true     | 0           | 0         |",  | 
 | 66 | +//!         "| false    | 1           | 1.1       |",  | 
 | 67 | +//!         "| true     | 0           | 0         |",  | 
 | 68 | +//!         "| false    | 1           | 1.1       |",  | 
 | 69 | +//!         "| true     | 0           | 0         |",  | 
 | 70 | +//!         "| false    | 1           | 1.1       |",  | 
 | 71 | +//!         "+----------+-------------+-----------+",  | 
 | 72 | +//!      ],  | 
 | 73 | +//!  );  | 
 | 74 | +//! # }  | 
 | 75 | +//! ```  | 
20 | 76 | 
  | 
21 | 77 | use std::collections::VecDeque;  | 
22 | 78 | use std::fmt::Formatter;  | 
@@ -425,58 +481,3 @@ impl PageIterator for ColumnChunkIterator {  | 
425 | 481 |         Ok(self.column_schema.clone())  | 
426 | 482 |     }  | 
427 | 483 | }  | 
428 |  | - | 
429 |  | -#[cfg(test)]  | 
430 |  | -mod tests {  | 
431 |  | -    use arrow::util::pretty::pretty_format_batches;  | 
432 |  | -    use futures::TryStreamExt;  | 
433 |  | -    use tokio::fs::File;  | 
434 |  | - | 
435 |  | -    use super::*;  | 
436 |  | - | 
437 |  | -    fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {  | 
438 |  | -        let formatted = pretty_format_batches(batches).unwrap().to_string();  | 
439 |  | -        let actual_lines: Vec<_> = formatted.trim().lines().collect();  | 
440 |  | -        assert_eq!(  | 
441 |  | -            &actual_lines, expected_lines,  | 
442 |  | -            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",  | 
443 |  | -            expected_lines, actual_lines  | 
444 |  | -        );  | 
445 |  | -    }  | 
446 |  | - | 
447 |  | -    #[tokio::test]  | 
448 |  | -    async fn test_parquet_stream() {  | 
449 |  | -        let testdata = arrow::util::test_util::parquet_test_data();  | 
450 |  | -        let path = format!("{}/alltypes_plain.parquet", testdata);  | 
451 |  | -        let file = File::open(path).await.unwrap();  | 
452 |  | - | 
453 |  | -        let builder = ParquetRecordBatchStreamBuilder::new(file)  | 
454 |  | -            .await  | 
455 |  | -            .unwrap()  | 
456 |  | -            .with_projection(vec![1, 2, 6])  | 
457 |  | -            .with_batch_size(3);  | 
458 |  | - | 
459 |  | -        let stream = builder.build().unwrap();  | 
460 |  | - | 
461 |  | -        let results = stream.try_collect::<Vec<_>>().await.unwrap();  | 
462 |  | -        assert_eq!(results.len(), 3);  | 
463 |  | - | 
464 |  | -        assert_batches_eq(  | 
465 |  | -            &results,  | 
466 |  | -            &[  | 
467 |  | -                "+----------+-------------+-----------+",  | 
468 |  | -                "| bool_col | tinyint_col | float_col |",  | 
469 |  | -                "+----------+-------------+-----------+",  | 
470 |  | -                "| true     | 0           | 0         |",  | 
471 |  | -                "| false    | 1           | 1.1       |",  | 
472 |  | -                "| true     | 0           | 0         |",  | 
473 |  | -                "| false    | 1           | 1.1       |",  | 
474 |  | -                "| true     | 0           | 0         |",  | 
475 |  | -                "| false    | 1           | 1.1       |",  | 
476 |  | -                "| true     | 0           | 0         |",  | 
477 |  | -                "| false    | 1           | 1.1       |",  | 
478 |  | -                "+----------+-------------+-----------+",  | 
479 |  | -            ],  | 
480 |  | -        );  | 
481 |  | -    }  | 
482 |  | -}  | 
 | 
0 commit comments