@@ -82,6 +82,7 @@ pub fn basic_parse() {
8282#[ cfg( test) ]
8383mod test {
8484 use super :: * ;
85+ use datafusion:: execution:: options:: ParquetReadOptions ;
8586 use datafusion:: {
8687 arrow:: {
8788 array:: { ArrayRef , Int32Array , RecordBatch , StringArray } ,
@@ -90,12 +91,16 @@ mod test {
9091 datasource:: MemTable ,
9192 execution:: context:: SessionContext ,
9293 } ;
94+ use datafusion_common:: test_util:: batches_to_string;
9395 use datafusion_execution:: {
9496 config:: SessionConfig , disk_manager:: DiskManagerConfig ,
9597 runtime_env:: RuntimeEnvBuilder ,
9698 } ;
9799 use datafusion_physical_plan:: collect;
98100 use datafusion_sql:: parser:: DFParser ;
101+ use insta:: assert_snapshot;
102+ use object_store:: { memory:: InMemory , path:: Path , ObjectStore } ;
103+ use url:: Url ;
99104 use wasm_bindgen_test:: wasm_bindgen_test;
100105
101106 wasm_bindgen_test:: wasm_bindgen_test_configure!( run_in_browser) ;
@@ -115,6 +120,22 @@ mod test {
115120 let session_config = SessionConfig :: new ( ) . with_target_partitions ( 1 ) ;
116121 Arc :: new ( SessionContext :: new_with_config_rt ( session_config, rt) )
117122 }
123+
124+ fn create_test_data ( ) -> ( Arc < Schema > , RecordBatch ) {
125+ let schema = Arc :: new ( Schema :: new ( vec ! [
126+ Field :: new( "id" , DataType :: Int32 , false ) ,
127+ Field :: new( "value" , DataType :: Utf8 , false ) ,
128+ ] ) ) ;
129+
130+ let data: Vec < ArrayRef > = vec ! [
131+ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
132+ Arc :: new( StringArray :: from( vec![ "a" , "b" , "c" ] ) ) ,
133+ ] ;
134+
135+ let batch = RecordBatch :: try_new ( schema. clone ( ) , data) . unwrap ( ) ;
136+ ( schema, batch)
137+ }
138+
118139 #[ wasm_bindgen_test( unsupported = tokio:: test) ]
119140 async fn basic_execute ( ) {
120141 let sql = "SELECT 2 + 2;" ;
@@ -185,26 +206,56 @@ mod test {
185206
186207 #[ wasm_bindgen_test( unsupported = tokio:: test) ]
187208 async fn test_parquet_write ( ) {
188- let schema = Arc :: new ( Schema :: new ( vec ! [
189- Field :: new( "id" , DataType :: Int32 , false ) ,
190- Field :: new( "value" , DataType :: Utf8 , false ) ,
191- ] ) ) ;
209+ let ( schema, batch) = create_test_data ( ) ;
210+ let mut buffer = Vec :: new ( ) ;
211+ let mut writer = datafusion:: parquet:: arrow:: ArrowWriter :: try_new (
212+ & mut buffer,
213+ schema. clone ( ) ,
214+ None ,
215+ )
216+ . unwrap ( ) ;
192217
193- let data: Vec < ArrayRef > = vec ! [
194- Arc :: new( Int32Array :: from( vec![ 1 ] ) ) ,
195- Arc :: new( StringArray :: from( vec![ "a" ] ) ) ,
196- ] ;
218+ writer. write ( & batch) . unwrap ( ) ;
219+ writer. close ( ) . unwrap ( ) ;
220+ }
197221
198- let batch = RecordBatch :: try_new ( schema. clone ( ) , data) . unwrap ( ) ;
222+ #[ wasm_bindgen_test( unsupported = tokio:: test) ]
223+ async fn test_parquet_read_and_write ( ) {
224+ let ( schema, batch) = create_test_data ( ) ;
199225 let mut buffer = Vec :: new ( ) ;
200226 let mut writer = datafusion:: parquet:: arrow:: ArrowWriter :: try_new (
201227 & mut buffer,
202228 schema. clone ( ) ,
203229 None ,
204230 )
205231 . unwrap ( ) ;
206-
207232 writer. write ( & batch) . unwrap ( ) ;
208233 writer. close ( ) . unwrap ( ) ;
234+
235+ let session_ctx = SessionContext :: new ( ) ;
236+ let store = InMemory :: new ( ) ;
237+
238+ let path = Path :: from ( "a.parquet" ) ;
239+ store. put ( & path, buffer. into ( ) ) . await . unwrap ( ) ;
240+
241+ let url = Url :: parse ( "memory://" ) . unwrap ( ) ;
242+ session_ctx. register_object_store ( & url, Arc :: new ( store) ) ;
243+
244+ let df = session_ctx
245+ . read_parquet ( "memory:///" , ParquetReadOptions :: new ( ) )
246+ . await
247+ . unwrap ( ) ;
248+
249+ let result = df. collect ( ) . await . unwrap ( ) ;
250+
251+ assert_snapshot ! ( batches_to_string( & result) , @r"
252+ +----+-------+
253+ | id | value |
254+ +----+-------+
255+ | 1 | a |
256+ | 2 | b |
257+ | 3 | c |
258+ +----+-------+
259+ " ) ;
209260 }
210261}
0 commit comments