Skip to content

Commit

Permalink
Add SortPreservingMergeExec (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 21, 2021
1 parent 120eccf commit f090a0a
Show file tree
Hide file tree
Showing 4 changed files with 994 additions and 30 deletions.
39 changes: 35 additions & 4 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ use std::fs::metadata;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};

use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::{Stream, TryStreamExt};
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::task::JoinHandle;

use crate::arrow::error::ArrowError;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;

use super::{RecordBatchStream, SendableRecordBatchStream};

/// Stream of record batches
pub struct SizedRecordBatchStream {
Expand Down Expand Up @@ -113,3 +118,29 @@ fn build_file_list_recurse(
}
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
input: Arc<dyn ExecutionPlan>,
mut output: mpsc::Sender<ArrowResult<RecordBatch>>,
partition: usize,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = match input.execute(partition).await {
Err(e) => {
// If send fails, plan being torn
// down, no place to send the error
let arrow_error = ArrowError::ExternalError(Box::new(e));
output.send(Err(arrow_error)).await.ok();
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error
output.send(item).await.ok();
}
})
}
29 changes: 3 additions & 26 deletions datafusion/src/physical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ use std::any::Any;
use std::sync::Arc;

use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::Stream;

use async_trait::async_trait;

use arrow::record_batch::RecordBatch;
use arrow::{
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::RecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};

use super::SendableRecordBatchStream;
use crate::physical_plan::common::spawn_execution;
use pin_project_lite::pin_project;

/// Merge execution plan executes partitions in parallel and combines them into a single
Expand Down Expand Up @@ -121,26 +117,7 @@ impl ExecutionPlan for MergeExec {
// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
for part_i in 0..input_partitions {
let input = self.input.clone();
let mut sender = sender.clone();
tokio::spawn(async move {
let mut stream = match input.execute(part_i).await {
Err(e) => {
// If send fails, plan being torn
// down, no place to send the error
let arrow_error = ArrowError::ExternalError(Box::new(e));
sender.send(Err(arrow_error)).await.ok();
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error
sender.send(item).await.ok();
}
});
spawn_execution(self.input.clone(), sender.clone(), part_i);
}

Ok(Box::pin(MergeStream {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ pub mod projection;
pub mod regex_expressions;
pub mod repartition;
pub mod sort;
pub mod sort_preserving_merge;
pub mod string_expressions;
pub mod type_coercion;
pub mod udaf;
Expand Down
Loading

0 comments on commit f090a0a

Please sign in to comment.