@@ -24,12 +24,13 @@ use crate::serde::scheduler::PartitionLocation;
2424
2525use arrow:: datatypes:: SchemaRef ;
2626use async_trait:: async_trait;
27- use datafusion:: physical_plan:: { ExecutionPlan , Partitioning } ;
27+ use datafusion:: physical_plan:: { DisplayFormatType , ExecutionPlan , Partitioning } ;
2828use datafusion:: {
2929 error:: { DataFusionError , Result } ,
3030 physical_plan:: RecordBatchStream ,
3131} ;
3232use log:: info;
33+ use std:: fmt:: Formatter ;
3334
3435/// ShuffleReaderExec reads partitions that have already been materialized by an executor.
3536#[ derive( Debug , Clone ) ]
@@ -103,4 +104,31 @@ impl ExecutionPlan for ShuffleReaderExec {
103104 . await
104105 . map_err ( |e| DataFusionError :: Execution ( format ! ( "Ballista Error: {:?}" , e) ) )
105106 }
107+
108+ fn fmt_as (
109+ & self ,
110+ t : DisplayFormatType ,
111+ f : & mut std:: fmt:: Formatter ,
112+ ) -> std:: fmt:: Result {
113+ match t {
114+ DisplayFormatType :: Default => {
115+ let loc_str = self
116+ . partition_location
117+ . iter ( )
118+ . map ( |l| {
119+ format ! (
120+ "[executor={} part={}:{}:{} stats={:?}]" ,
121+ l. executor_meta. id,
122+ l. partition_id. job_id,
123+ l. partition_id. stage_id,
124+ l. partition_id. partition_id,
125+ l. partition_stats
126+ )
127+ } )
128+ . collect :: < Vec < String > > ( )
129+ . join ( "," ) ;
130+ write ! ( f, "ShuffleReaderExec: partition_locations={}" , loc_str)
131+ }
132+ }
133+ }
106134}
0 commit comments