@@ -1660,111 +1660,4 @@ mod tests {
1660
1660
1661
1661
Ok ( ( ) )
1662
1662
}
1663
-
1664
- #[ tokio:: test]
1665
- async fn test_memory_reservation_column_parallel ( ) -> Result < ( ) > {
1666
- async fn test_memory_reservation ( global : ParquetOptions ) -> Result < ( ) > {
1667
- let field_a = Field :: new ( "a" , DataType :: Utf8 , false ) ;
1668
- let field_b = Field :: new ( "b" , DataType :: Utf8 , false ) ;
1669
- let schema = Arc :: new ( Schema :: new ( vec ! [ field_a, field_b] ) ) ;
1670
- let object_store_url = ObjectStoreUrl :: local_filesystem ( ) ;
1671
-
1672
- let file_sink_config = FileSinkConfig {
1673
- original_url : String :: default ( ) ,
1674
- object_store_url : object_store_url. clone ( ) ,
1675
- file_group : FileGroup :: new ( vec ! [ PartitionedFile :: new(
1676
- "/tmp" . to_string( ) ,
1677
- 1 ,
1678
- ) ] ) ,
1679
- table_paths : vec ! [ ListingTableUrl :: parse( "file:///" ) ?] ,
1680
- output_schema : schema. clone ( ) ,
1681
- table_partition_cols : vec ! [ ] ,
1682
- insert_op : InsertOp :: Overwrite ,
1683
- keep_partition_by_columns : false ,
1684
- file_extension : "parquet" . into ( ) ,
1685
- } ;
1686
- let parquet_sink = Arc :: new ( ParquetSink :: new (
1687
- file_sink_config,
1688
- TableParquetOptions {
1689
- key_value_metadata : std:: collections:: HashMap :: from ( [
1690
- ( "my-data" . to_string ( ) , Some ( "stuff" . to_string ( ) ) ) ,
1691
- ( "my-data-bool-key" . to_string ( ) , None ) ,
1692
- ] ) ,
1693
- global,
1694
- ..Default :: default ( )
1695
- } ,
1696
- ) ) ;
1697
-
1698
- // create data
1699
- let col_a: ArrayRef = Arc :: new ( StringArray :: from ( vec ! [ "foo" , "bar" ] ) ) ;
1700
- let col_b: ArrayRef = Arc :: new ( StringArray :: from ( vec ! [ "baz" , "baz" ] ) ) ;
1701
- let batch =
1702
- RecordBatch :: try_from_iter ( vec ! [ ( "a" , col_a) , ( "b" , col_b) ] ) . unwrap ( ) ;
1703
-
1704
- // create task context
1705
- let task_context = build_ctx ( object_store_url. as_ref ( ) ) ;
1706
- assert_eq ! (
1707
- task_context. memory_pool( ) . reserved( ) ,
1708
- 0 ,
1709
- "no bytes are reserved yet"
1710
- ) ;
1711
-
1712
- let mut write_task = FileSink :: write_all (
1713
- parquet_sink. as_ref ( ) ,
1714
- Box :: pin ( RecordBatchStreamAdapter :: new (
1715
- schema,
1716
- bounded_stream ( batch, 1000 ) ,
1717
- ) ) ,
1718
- & task_context,
1719
- ) ;
1720
-
1721
- // incrementally poll and check for memory reservation
1722
- let mut reserved_bytes = 0 ;
1723
- while futures:: poll!( & mut write_task) . is_pending ( ) {
1724
- reserved_bytes += task_context. memory_pool ( ) . reserved ( ) ;
1725
- tokio:: time:: sleep ( Duration :: from_micros ( 1 ) ) . await ;
1726
- }
1727
- assert ! (
1728
- reserved_bytes > 0 ,
1729
- "should have bytes reserved during write"
1730
- ) ;
1731
- assert_eq ! (
1732
- task_context. memory_pool( ) . reserved( ) ,
1733
- 0 ,
1734
- "no leaking byte reservation"
1735
- ) ;
1736
-
1737
- Ok ( ( ) )
1738
- }
1739
-
1740
- let write_opts = ParquetOptions {
1741
- allow_single_file_parallelism : false ,
1742
- ..Default :: default ( )
1743
- } ;
1744
- test_memory_reservation ( write_opts)
1745
- . await
1746
- . expect ( "should track for non-parallel writes" ) ;
1747
-
1748
- let row_parallel_write_opts = ParquetOptions {
1749
- allow_single_file_parallelism : true ,
1750
- maximum_parallel_row_group_writers : 10 ,
1751
- maximum_buffered_record_batches_per_stream : 1 ,
1752
- ..Default :: default ( )
1753
- } ;
1754
- test_memory_reservation ( row_parallel_write_opts)
1755
- . await
1756
- . expect ( "should track for row-parallel writes" ) ;
1757
-
1758
- let col_parallel_write_opts = ParquetOptions {
1759
- allow_single_file_parallelism : true ,
1760
- maximum_parallel_row_group_writers : 1 ,
1761
- maximum_buffered_record_batches_per_stream : 2 ,
1762
- ..Default :: default ( )
1763
- } ;
1764
- test_memory_reservation ( col_parallel_write_opts)
1765
- . await
1766
- . expect ( "should track for column-parallel writes" ) ;
1767
-
1768
- Ok ( ( ) )
1769
- }
1770
1663
}
0 commit comments