Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use vortex_bench::Format;
use vortex_bench::SESSION;
use vortex_datafusion::VortexFormat;
use vortex_datafusion::VortexFormatFactory;
use vortex_datafusion::VortexOptions;

#[allow(clippy::expect_used)]
pub fn get_session_context() -> SessionContext {
Expand All @@ -44,7 +45,10 @@ pub fn get_session_context() -> SessionContext {
.build_arc()
.expect("could not build runtime environment");

let factory = VortexFormatFactory::new();
let factory = VortexFormatFactory::new().with_options(VortexOptions {
projection_pushdown: true,
..Default::default()
});

let mut session_state_builder = SessionStateBuilder::new()
.with_config(SessionConfig::from_env().expect("shouldn't fail"))
Expand Down
60 changes: 60 additions & 0 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,32 @@ pub trait ExpressionConvertor: Send + Sync {
input_schema: &Schema,
output_schema: &Schema,
) -> DFResult<ProcessedProjection>;

/// Create a projection that reads only the required columns without pushing down
/// any expressions. All projection logic is applied after the scan.
fn no_pushdown_projection(
&self,
source_projection: ProjectionExprs,
input_schema: &Schema,
) -> DFResult<ProcessedProjection> {
// Get all unique column indices referenced by the projection
let column_indices = source_projection.column_indices();

// Create scan projection that reads the required columns
let scan_columns: Vec<(String, Expression)> = column_indices
.into_iter()
.map(|idx| {
let field = input_schema.field(idx);
let name = field.name().clone();
(name.clone(), get_item(name, root()))
})
.collect();

Ok(ProcessedProjection {
scan_projection: pack(scan_columns, Nullability::NonNullable),
leftover_projection: source_projection,
})
}
}

/// The default [`ExpressionConvertor`].
Expand Down Expand Up @@ -444,6 +470,7 @@ mod tests {
use rstest::rstest;

use super::*;
use crate::common_tests::TestSessionContext;

#[rstest::fixture]
fn test_schema() -> Schema {
Expand Down Expand Up @@ -747,4 +774,37 @@ mod tests {

assert!(!can_be_pushed_down_impl(&like_expr, &test_schema));
}

// https://github.com/vortex-data/vortex/issues/6211
#[tokio::test]
async fn test_cast_int_to_string() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

ctx.session
.sql(r#"copy (select 1 as id) to 'example.vortex'"#)
.await?
.show()
.await?;

ctx.session
.sql(r#"select cast(id as string) as sid from 'example.vortex' where id > 0"#)
.await?
.show()
.await?;

ctx.session
.sql(r#"select id from 'example.vortex' where cast (id as string) == '1'"#)
.await?
.show()
.await?;

// This fails as it pushes string cast to the scan
ctx.session
.sql(r#"select cast(id as string) from 'example.vortex'"#)
.await?
.collect()
.await?;

Ok(())
}
}
23 changes: 18 additions & 5 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ mod common_tests {
use vortex::session::VortexSession;

use crate::VortexFormatFactory;
use crate::VortexOptions;

static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);

Expand All @@ -85,25 +86,37 @@ mod common_tests {

impl Default for TestSessionContext {
fn default() -> Self {
Self::new(false)
}
}

impl TestSessionContext {
/// Create a new test session context with the given projection pushdown setting.
pub fn new(projection_pushdown: bool) -> Self {
let store = Arc::new(InMemory::new());
let factory = Arc::new(VortexFormatFactory::new());
let session_state_builder = SessionStateBuilder::new()
let opts = VortexOptions {
projection_pushdown,
..Default::default()
};
let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
let mut session_state_builder = SessionStateBuilder::new()
.with_default_features()
.with_table_factory(
factory.get_ext().to_uppercase(),
Arc::new(DefaultTableFactory::new()),
)
.with_file_formats(vec![factory])
.with_object_store(&Url::try_from("file://").unwrap(), store.clone());

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(factory as _);
}

let session: SessionContext =
SessionContext::new_with_state(session_state_builder.build()).enable_url_table();

Self { store, session }
}
}

impl TestSessionContext {
// Write arrow data into a vortex file.
pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
where
Expand Down
11 changes: 10 additions & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ config_namespace! {
/// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
/// during footer parsing.
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
/// Whether to enable projection pushdown into the underlying Vortex scan.
///
/// When enabled, projection expressions may be partially evaluated during
/// the scan. When disabled, Vortex reads only the referenced columns and
/// all expressions are evaluated after the scan.
pub projection_pushdown: bool, default = false
}
}

Expand Down Expand Up @@ -497,7 +503,10 @@ impl FileFormat for VortexFormat {
}

fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(VortexSource::new(table_schema, self.session.clone()))
Arc::new(
VortexSource::new(table_schema, self.session.clone())
.with_projection_pushdown(self.opts.projection_pushdown),
)
}
}

Expand Down
44 changes: 37 additions & 7 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::sync::Weak;

use arrow_schema::Schema;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DFResult;
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -92,6 +93,8 @@ pub(crate) struct VortexOpener {

pub expression_convertor: Arc<dyn ExpressionConvertor>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
pub projection_pushdown: bool,
}

impl FileOpener for VortexOpener {
Expand Down Expand Up @@ -124,6 +127,7 @@ impl FileOpener for VortexOpener {
let has_output_ordering = self.has_output_ordering;

let expr_convertor = self.expression_convertor.clone();
let projection_pushdown = self.projection_pushdown;

// Replace column access for partition columns with literals
#[allow(clippy::disallowed_types)]
Expand Down Expand Up @@ -225,18 +229,38 @@ impl FileOpener for VortexOpener {
let ProcessedProjection {
scan_projection,
leftover_projection,
} = expr_convertor.split_projection(
projection,
&this_file_schema,
&projected_physical_schema,
)?;
} = if projection_pushdown {
expr_convertor.split_projection(
projection.clone(),
&this_file_schema,
&projected_physical_schema,
)?
} else {
// When projection pushdown is disabled, read only the required columns
// and apply the full projection after the scan.
expr_convertor.no_pushdown_projection(projection.clone(), &this_file_schema)?
};

// The schema of the stream returned from the vortex scan.
// We use the physical_file_schema as reference for types that don't roundtrip.
// We use a reference schema for types that don't roundtrip (Dictionary, Utf8, etc.).
let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| {
exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan")
})?;
let stream_schema = calculate_physical_schema(&scan_dtype, &projected_physical_schema)?;

// When projection pushdown is enabled, the scan outputs the projected columns.
// When disabled, the scan outputs raw columns and the projection is applied after.
let scan_reference_schema = if projection_pushdown {
projected_physical_schema
} else {
// Build schema from the raw columns being read
let column_indices = projection.column_indices();
let fields: Vec<_> = column_indices
.into_iter()
.map(|idx| this_file_schema.field(idx).clone())
.collect();
Schema::new(fields)
};
let stream_schema = calculate_physical_schema(&scan_dtype, &scan_reference_schema)?;

let leftover_projection = leftover_projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
Expand Down Expand Up @@ -541,6 +565,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
}
}

Expand Down Expand Up @@ -633,6 +658,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
};

let filter = col("a").lt(lit(100_i32));
Expand Down Expand Up @@ -717,6 +743,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
};

// The opener should successfully open the file and reorder columns
Expand Down Expand Up @@ -870,6 +897,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
};

// This should succeed and return the correctly projected and cast data
Expand Down Expand Up @@ -927,6 +955,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
}
}

Expand Down Expand Up @@ -1126,6 +1155,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
projection_pushdown: false,
};

let file = PartitionedFile::new(file_path.to_string(), data_size);
Expand Down
10 changes: 10 additions & 0 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub struct VortexSource {
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
projection_pushdown: bool,
}

impl VortexSource {
Expand All @@ -85,9 +87,16 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
projection_pushdown: false,
}
}

/// Enable or disable expression pushdown into the underlying Vortex scan.
pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
self.projection_pushdown = enabled;
self
}

/// Set a [`ExpressionConvertor`] to control how Datafusion expression should be converted and pushed down.
pub fn with_expression_convertor(
mut self,
Expand Down Expand Up @@ -160,6 +169,7 @@ impl FileSource for VortexSource {
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: self.file_metadata_cache.clone(),
projection_pushdown: self.projection_pushdown,
};

Ok(Arc::new(opener))
Expand Down
Loading
Loading