Skip to content

Commit

Permalink
Merge branch 'main' into lead-lag-window-udf
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsherin committed Aug 30, 2024
2 parents 734116c + e603185 commit bc8b7d4
Show file tree
Hide file tree
Showing 72 changed files with 1,224 additions and 1,286 deletions.
355 changes: 156 additions & 199 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ readme = "README.md"
[dependencies]
arrow = { version = "52.2.0" }
async-trait = "0.1.73"
aws-config = "0.55"
aws-credential-types = "0.55"
aws-config = "1.5.5"
aws-credential-types = "1.2.0"
clap = { version = "4.5.16", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "41.0.0", features = [
"avro",
Expand Down
3 changes: 2 additions & 1 deletion datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;

use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::{AmazonS3Builder, AwsCredential};
use object_store::gcp::GoogleCloudStorageBuilder;
Expand Down Expand Up @@ -61,7 +62,7 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_token(session_token);
}
} else {
let config = aws_config::from_env().load().await;
let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
if let Some(region) = config.region() {
builder = builder.with_region(region.to_string());
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod non_windows {
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::SortExpr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
Expand All @@ -49,7 +49,7 @@ mod non_windows {
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
sort: Vec<Vec<SortExpr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
Expand Down
75 changes: 1 addition & 74 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand Down Expand Up @@ -156,79 +155,6 @@ impl Column {
}
}

/// Qualify column if not done yet.
///
/// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
/// ignored. Otherwise this will search through the given schemas to find the column. This will use the first schema
/// that matches.
///
/// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
/// exception for `USING` statements, see below.
///
/// # Using columns
/// Take the following SQL statement:
///
/// ```sql
/// SELECT id FROM t1 JOIN t2 USING(id)
/// ```
///
/// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
/// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
/// in this example this would be `[{t1.id, t2.id}]`.
#[deprecated(
since = "20.0.0",
note = "use normalize_with_schemas_and_ambiguity_check instead"
)]
pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}

for schema in schemas {
let qualified_fields =
schema.qualified_fields_with_unqualified_name(&self.name);
match qualified_fields.len() {
0 => continue,
1 => {
return Ok(Column::from(qualified_fields[0]));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
// This should only happen when a JOIN query with USING constraint references
// join columns using unqualified column name. For example:
//
// ```sql
// SELECT id FROM t1 JOIN t2 USING(id)
// ```
//
// In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema.columns_with_unqualified_name(&self.name);
for using_col in using_columns {
let all_matched = columns.iter().all(|f| using_col.contains(f));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifier from the first match.
if all_matched {
return Ok(columns[0].clone());
}
}
}
}
}

_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas.iter().flat_map(|s| s.columns()).collect(),
})
}

/// Qualify column if not done yet.
///
/// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
Expand Down Expand Up @@ -381,6 +307,7 @@ mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow_schema::SchemaBuilder;
use std::sync::Arc;

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand Down Expand Up @@ -577,7 +577,7 @@ impl DataFrame {
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -776,6 +776,15 @@ impl DataFrame {
})
}

/// Apply a sort by provided expressions with default direction
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
self.sort(
expr.into_iter()
.map(|e| e.sort(true, false))
.collect::<Vec<SortExpr>>(),
)
}

/// Sort the DataFrame by the specified sorting expressions.
///
/// Note that any expression can be turned into
Expand All @@ -797,7 +806,7 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::TableOptions;
Expand All @@ -41,6 +40,7 @@ use datafusion_common::{
};

use async_trait::async_trait;
use datafusion_expr::SortExpr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct CsvReadOptions<'a> {
/// File compression type
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a> CsvReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct ParquetReadOptions<'a> {
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<'a> ParquetReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -397,7 +397,7 @@ pub struct NdJsonReadOptions<'a> {
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<'a> NdJsonReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
Expand Down
14 changes: 4 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::TableType;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
Expand Down Expand Up @@ -222,7 +222,7 @@ pub struct ListingOptions {
/// ordering (encapsulated by a `Vec<Expr>`). If there aren't
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl ListingOptions {
Expand Down Expand Up @@ -385,7 +385,7 @@ impl ListingOptions {
///
/// assert_eq!(listing_options.file_sort_order, file_sort_order);
/// ```
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -909,8 +909,7 @@ impl TableProvider for ListingTable {
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
let order_requirements = if self.options().file_sort_order != unsorted {
let order_requirements = if !self.options().file_sort_order.is_empty() {
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
.try_create_output_ordering()?
Expand Down Expand Up @@ -1160,11 +1159,6 @@ mod tests {
// (file_sort_order, expected_result)
let cases = vec![
(vec![], Ok(vec![])),
// not a sort expr
(
vec![vec![col("string_col")]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
// sort expr, but non column
(
vec![vec![
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand All @@ -64,7 +65,7 @@ pub struct MemTable {
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<Expr>>>>,
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}

impl MemTable {
Expand Down Expand Up @@ -118,7 +119,7 @@ impl MemTable {
///
/// Note that multiple sort orders are supported, if some are known to be
/// equivalent,
pub fn with_sort_order(self, mut sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
self
}
Expand Down
Loading

0 comments on commit bc8b7d4

Please sign in to comment.