@@ -28,8 +28,8 @@ use datafusion::physical_plan::aggregates::{
2828use rand:: rngs:: StdRng ;
2929use rand:: { Rng , SeedableRng } ;
3030
31- use datafusion:: physical_plan:: collect;
3231use datafusion:: physical_plan:: memory:: MemoryExec ;
32+ use datafusion:: physical_plan:: { collect, displayable, ExecutionPlan } ;
3333use datafusion:: prelude:: { SessionConfig , SessionContext } ;
3434use datafusion_physical_expr:: expressions:: { col, Sum } ;
3535use datafusion_physical_expr:: { AggregateExpr , PhysicalSortExpr } ;
@@ -107,6 +107,10 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
107107 . map ( |elem| ( col ( elem, & schema) . unwrap ( ) , elem. to_string ( ) ) )
108108 . collect :: < Vec < _ > > ( ) ;
109109 let group_by = PhysicalGroupBy :: new_single ( expr) ;
110+
111+ println ! ( "aggregate_expr: {aggregate_expr:?}" ) ;
112+ println ! ( "group_by: {group_by:?}" ) ;
113+
110114 let aggregate_exec_running = Arc :: new (
111115 AggregateExec :: try_new (
112116 AggregateMode :: Partial ,
@@ -118,7 +122,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
118122 schema. clone ( ) ,
119123 )
120124 . unwrap ( ) ,
121- ) as _ ;
125+ ) as Arc < dyn ExecutionPlan > ;
122126
123127 let aggregate_exec_usual = Arc :: new (
124128 AggregateExec :: try_new (
@@ -131,14 +135,14 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
131135 schema. clone ( ) ,
132136 )
133137 . unwrap ( ) ,
134- ) as _ ;
138+ ) as Arc < dyn ExecutionPlan > ;
135139
136140 let task_ctx = ctx. task_ctx ( ) ;
137- let collected_usual = collect ( aggregate_exec_usual, task_ctx. clone ( ) )
141+ let collected_usual = collect ( aggregate_exec_usual. clone ( ) , task_ctx. clone ( ) )
138142 . await
139143 . unwrap ( ) ;
140144
141- let collected_running = collect ( aggregate_exec_running, task_ctx. clone ( ) )
145+ let collected_running = collect ( aggregate_exec_running. clone ( ) , task_ctx. clone ( ) )
142146 . await
143147 . unwrap ( ) ;
144148 assert ! ( collected_running. len( ) > 2 ) ;
@@ -162,7 +166,23 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
162166 . zip ( & running_formatted_sorted)
163167 . enumerate ( )
164168 {
165- assert_eq ! ( ( i, usual_line) , ( i, running_line) , "Inconsistent result" ) ;
169+ assert_eq ! (
170+ ( i, usual_line) ,
171+ ( i, running_line) ,
172+ "Inconsistent result\n \n \
173+ Left Plan:\n {}\n \
174+ Right Plan:\n {}\n \
175+ schema:\n {schema}\n \
176+ Left Ouptut:\n {}\n \
177+ Right Output:\n {}\n \
178+ input:\n {}\n \
179+ ",
180+ displayable( aggregate_exec_usual. as_ref( ) ) . indent( false ) ,
181+ displayable( aggregate_exec_running. as_ref( ) ) . indent( false ) ,
182+ usual_formatted,
183+ running_formatted,
184+ pretty_format_batches( & input1) . unwrap( ) ,
185+ ) ;
166186 }
167187}
168188
0 commit comments