Skip to content
Merged
1 change: 1 addition & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::from_slice::FromSlice;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScan;
Expand Down
14 changes: 11 additions & 3 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
Expand All @@ -41,15 +42,21 @@ pub struct SizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
baseline_metrics: BaselineMetrics,
}

impl SizedRecordBatchStream {
/// Create a new RecordBatchIterator
pub fn new(schema: SchemaRef, batches: Vec<Arc<RecordBatch>>) -> Self {
pub fn new(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
baseline_metrics: BaselineMetrics,
) -> Self {
SizedRecordBatchStream {
schema,
index: 0,
batches,
baseline_metrics,
}
}
}
Expand All @@ -61,12 +68,13 @@ impl Stream for SizedRecordBatchStream {
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.batches.len() {
let poll = Poll::Ready(if self.index < self.batches.len() {
self.index += 1;
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
} else {
None
})
});
self.baseline_metrics.record_poll(poll)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make metrics right when there is only one record batch.

}
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc

use super::SendableRecordBatchStream;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use async_trait::async_trait;

/// Explain execution plan operator. This operator contains the string
Expand Down Expand Up @@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec {
],
)?;

let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, partition);

Ok(Box::pin(SizedRecordBatchStream::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
baseline_metrics,
)))
}

Expand Down
Loading