Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true }
Expand Down
136 changes: 133 additions & 3 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup;
use crate::{
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
source::DataSource, statistics::MinMaxStatistics,
file_stream::work_source::SharedWorkSource, source::DataSource,
statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, Schema, SchemaRef};
Expand All @@ -38,6 +39,7 @@ use datafusion_execution::{
};
use datafusion_expr::Operator;

use crate::source::OpenArgs;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
Expand Down Expand Up @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.open_with_args(OpenArgs::new(partition, context))
}

fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an open_with_args API to mirror other with_args APIs such as TableSource::scan_with_args:

async fn scan_with_args<'a>(
&self,
state: &dyn Session,
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let filters = args.filters().unwrap_or(&[]);
let projection = args.projection().map(|p| p.to_vec());
let limit = args.limit();
let plan = self
.scan(state, projection.as_ref(), filters, limit)
.await?;
Ok(plan.into())
}

The new API was required to pass in the shared state (aka to connect sibling streams so they can share / reorder work)

let OpenArgs {
partition,
context,
sibling_state,
} = args;
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let batch_size = self
.batch_size
Expand All @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig {

let morselizer = source.create_morselizer(object_store, self, partition)?;

// Extract the shared work source from the sibling state if it exists.
// This allows multiple sibling streams to steal work from a single
// shared queue of unopened files.
let shared_work_source = sibling_state
.as_ref()
.and_then(|state| state.downcast_ref::<SharedWorkSource>())
.cloned();

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_shared_work_source(shared_work_source)
.with_morselizer(morselizer)
.with_metrics(source.metrics())
.build()?;
Expand Down Expand Up @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig {
// Delegate to the file source
self.file_source.apply_expressions(f)
}

/// Create any shared state that should be passed between sibling streams
/// during one execution.
///
/// This returns `None` when sibling streams must not share work, such as
/// when file order must be preserved or the file groups define the output
/// partitioning needed for the rest of the plan
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
if self.preserve_order || self.partitioned_by_file_group {
return None;
}

Some(Arc::new(SharedWorkSource::from_config(self)) as Arc<dyn Any + Send + Sync>)
}
}

impl FileScanConfig {
Expand Down Expand Up @@ -1368,19 +1402,33 @@ mod tests {

use super::*;
use crate::TableSchema;
use crate::source::DataSourceExec;
use crate::test_util::col;
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
};

use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::Field;
use datafusion_common::ColumnStatistics;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{Result, assert_batches_eq, internal_err};
use datafusion_execution::TaskContext;
use datafusion_expr::SortExpr;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::projection::ProjectionExpr;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::execution_plan::collect;
use futures::FutureExt as _;
use futures::StreamExt as _;
use futures::stream;
use object_store::ObjectStore;
use std::fmt::Debug;

#[derive(Clone)]
struct InexactSortPushdownSource {
Expand All @@ -1400,7 +1448,7 @@ mod tests {
impl FileSource for InexactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
Expand Down Expand Up @@ -2288,6 +2336,88 @@ mod tests {
assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
}

/// Regression test for reusing a `DataSourceExec` after its execution-local
/// shared work queue has been drained.
///
/// This test uses a single file group with two files so the scan creates a
/// shared unopened-file queue. Executing after `reset_state` must recreate
/// the shared queue and return the same rows again.
#[tokio::test]
async fn reset_state_recreates_shared_work_source() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
false,
)]));
let file_source = Arc::new(
MockSource::new(Arc::clone(&schema))
.with_file_opener(Arc::new(ResetStateTestFileOpener { schema })),
);

let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_group(FileGroup::new(vec![
PartitionedFile::new("file1.parquet", 100),
PartitionedFile::new("file2.parquet", 100),
]))
.build();

let exec: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
let task_ctx = Arc::new(TaskContext::default());

// Running the same scan after resetting the state, should
// produce the same answer.
let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?;
let reset_exec = exec.reset_state()?;
let second_run = collect(reset_exec, task_ctx).await?;

let expected = [
"+-------+",
"| value |",
"+-------+",
"| 1 |",
"| 2 |",
"+-------+",
];
assert_batches_eq!(expected, &first_run);
assert_batches_eq!(expected, &second_run);

Ok(())
}

/// Test-only `FileOpener` that turns file names like `file1.parquet` into a
/// single-batch stream containing that numeric value
#[derive(Debug)]
struct ResetStateTestFileOpener {
schema: SchemaRef,
}

impl crate::file_stream::FileOpener for ResetStateTestFileOpener {
fn open(
&self,
file: PartitionedFile,
) -> Result<crate::file_stream::FileOpenFuture> {
let value = file
.object_meta
.location
.as_ref()
.trim_start_matches("file")
.trim_end_matches(".parquet")
.parse::<i32>()
.expect("invalid test file name");
let schema = Arc::clone(&self.schema);
Ok(async move {
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![value]))],
)
.expect("test batch should be valid");
Ok(stream::iter(vec![Ok(batch)]).boxed())
}
.boxed())
}
}

#[test]
fn test_output_partitioning_not_partitioned_by_file_group() {
let file_schema = aggr_test_schema();
Expand Down Expand Up @@ -2476,7 +2606,7 @@ mod tests {
impl FileSource for ExactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
Expand Down
21 changes: 19 additions & 2 deletions datafusion/datasource/src/file_stream/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use crate::file_scan_config::FileScanConfig;
use crate::file_stream::scan_state::ScanState;
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
Expand All @@ -33,17 +34,19 @@ pub struct FileStreamBuilder<'a> {
morselizer: Option<Box<dyn Morselizer>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
shared_work_source: Option<SharedWorkSource>,
}

impl<'a> FileStreamBuilder<'a> {
/// Create a new builder.
/// Create a new builder for [`FileStream`].
pub fn new(config: &'a FileScanConfig) -> Self {
Self {
config,
partition: None,
morselizer: None,
metrics: None,
on_error: OnError::Fail,
shared_work_source: None,
}
}

Expand Down Expand Up @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> {
self
}

/// Configure the [`SharedWorkSource`] for sibling work stealing.
pub(crate) fn with_shared_work_source(
mut self,
shared_work_source: Option<SharedWorkSource>,
) -> Self {
self.shared_work_source = shared_work_source;
self
}

/// Build the configured [`FileStream`].
pub fn build(self) -> Result<FileStream> {
let Self {
Expand All @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> {
morselizer,
metrics,
on_error,
shared_work_source,
} = self;

let Some(partition) = partition else {
Expand All @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> {
"FileStreamBuilder invalid partition index: {partition}"
);
};
let work_source = match shared_work_source {
Some(shared) => WorkSource::Shared(shared),
None => WorkSource::Local(file_group.into_inner().into()),
};

let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
let scan_state = Box::new(ScanState::new(
file_group.into_inner(),
work_source,
config.limit,
morselizer,
on_error,
Expand Down
Loading
Loading