Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/recompute_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 9, 2024
2 parents bd7b62f + 2a15614 commit 00d03f3
Show file tree
Hide file tree
Showing 56 changed files with 1,256 additions and 855 deletions.
2 changes: 1 addition & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Benchmark tpch_mem.json
└──────────────┴──────────────┴──────────────┴───────────────┘
```

Note that you can also execute an automatic comparison of the changes in a given PR against the base
Note that you can also execute an automatic comparison of the changes in a given PR against the base
just by including the trigger `/benchmark` in any comment.

### Running Benchmarks Manually
Expand Down
25 changes: 25 additions & 0 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,22 @@ pub fn find_indices<T: PartialEq, S: Borrow<T>>(
.ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
}

/// Transposes the given vector of vectors.
pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
match original.as_slice() {
[] => vec![],
[first, ..] => {
let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
for row in original {
for (item, transposed_row) in row.into_iter().zip(&mut result) {
transposed_row.push(item);
}
}
result
}
}
}

#[cfg(test)]
mod tests {
use crate::ScalarValue::Null;
Expand Down Expand Up @@ -990,4 +1006,13 @@ mod tests {
assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
Ok(())
}

#[test]
fn test_transpose() -> Result<()> {
let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
let transposed = transpose(in_data);
let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
assert_eq!(expected, transposed);
Ok(())
}
}
65 changes: 21 additions & 44 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,14 @@ impl InformationSchemaConfig {
}

// Add a final list for the information schema tables themselves
builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
builder.add_table(&catalog_name, INFORMATION_SCHEMA, VIEWS, TableType::View);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
DF_SETTINGS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SCHEMATA,
TableType::View,
);
for table_name in INFORMATION_SCHEMA_TABLES {
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
table_name,
TableType::View,
);
}
}

Ok(())
Expand Down Expand Up @@ -225,37 +213,29 @@ impl InformationSchemaConfig {

#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &(dyn Any + 'static) {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
vec![
TABLES.to_string(),
VIEWS.to_string(),
COLUMNS.to_string(),
DF_SETTINGS.to_string(),
SCHEMATA.to_string(),
]
INFORMATION_SCHEMA_TABLES
.iter()
.map(|t| t.to_string())
.collect()
}

async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
let config = self.config.clone();
let table: Arc<dyn PartitionStream> = if name.eq_ignore_ascii_case("tables") {
Arc::new(InformationSchemaTables::new(config))
} else if name.eq_ignore_ascii_case("columns") {
Arc::new(InformationSchemaColumns::new(config))
} else if name.eq_ignore_ascii_case("views") {
Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
Arc::new(InformationSchemaDfSettings::new(config))
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return Ok(None);
let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
TABLES => Arc::new(InformationSchemaTables::new(config)),
COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
VIEWS => Arc::new(InformationSchemaViews::new(config)),
DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
SCHEMATA => Arc::new(InformationSchemata::new(config)),
_ => return Ok(None),
};

Ok(Some(Arc::new(
Expand All @@ -264,10 +244,7 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_exist(&self, name: &str) -> bool {
matches!(
name.to_ascii_lowercase().as_str(),
TABLES | VIEWS | COLUMNS | SCHEMATA
)
INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
}
}

Expand Down
23 changes: 21 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,8 +1587,9 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
cast, count_distinct, create_udf, expr, lit, sum, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
array_agg, cast, count_distinct, create_udf, expr, lit, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
Expand Down Expand Up @@ -2044,6 +2045,24 @@ mod tests {
Ok(())
}

// Test issue: https://github.com/apache/datafusion/issues/10346
#[tokio::test]
async fn test_select_over_aggregate_schema() -> Result<()> {
let df = test_table()
.await?
.with_column("c", col("c1"))?
.aggregate(vec![], vec![array_agg(col("c")).alias("c")])?
.select(vec![col("c")])?;

assert_eq!(df.schema().fields().len(), 1);
let field = df.schema().field(0);
// There are two columns named 'c', one from the input of the aggregate and the other from the output.
// Select should return the column from the output of the aggregate, which is a list.
assert!(matches!(field.data_type(), DataType::List(_)));

Ok(())
}

#[tokio::test]
async fn test_distinct() -> Result<()> {
let t = test_table().await?;
Expand Down
18 changes: 7 additions & 11 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use log::{debug, trace};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema, DataFusionError};
use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility};
use datafusion_expr::{Expr, Volatility};
use datafusion_physical_expr::create_physical_expr;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -89,16 +89,12 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::Case { .. } => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match &scalar_function.func_def {
ScalarFunctionDefinition::UDF(fun) => {
match fun.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
match scalar_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
}
Expand Down
32 changes: 31 additions & 1 deletion datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl ListingTableUrl {

/// Returns `true` if `path` refers to a collection of objects
pub fn is_collection(&self) -> bool {
self.url.as_str().ends_with(DELIMITER)
self.url.path().ends_with(DELIMITER)
}

/// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
Expand Down Expand Up @@ -463,4 +463,34 @@ mod tests {
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}

#[test]
fn test_is_collection() {
fn test(input: &str, expected: bool, message: &str) {
let url = ListingTableUrl::parse(input).unwrap();
assert_eq!(url.is_collection(), expected, "{message}");
}

test("https://a.b.c/path/", true, "path ends with / - collection");
test(
"https://a.b.c/path/?a=b",
true,
"path ends with / - with query args - collection",
);
test(
"https://a.b.c/path?a=b/",
false,
"path not ends with / - query ends with / - not collection",
);
test(
"https://a.b.c/path/#a=b",
true,
"path ends with / - with fragment - collection",
);
test(
"https://a.b.c/path#a=b/",
false,
"path not ends with / - fragment ends with / - not collection",
);
}
}
19 changes: 5 additions & 14 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,7 @@ mod tests {
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{
ColumnarValue, Operator, ScalarFunctionDefinition, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
ColumnarValue, Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, CastExpr, NegativeExpr,
Expand Down Expand Up @@ -1363,9 +1362,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
DummyUDF::new(),
))),
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -1431,9 +1428,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
DummyUDF::new(),
))),
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -1502,9 +1497,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
DummyUDF::new(),
))),
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -1570,9 +1563,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
DummyUDF::new(),
))),
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_new", 1)),
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
}
};
}
Expr::ScalarFunction(fun) => {
create_function_physical_name(fun.name(), false, &fun.args, None)
}
Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
Expand Down Expand Up @@ -491,6 +489,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let plan = self
.create_initial_plan(logical_plan, session_state)
.await?;

self.optimize_internal(plan, session_state, |_, _| {})
}
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ async fn unsupported_dml_returns_error() {
ctx.sql_with_options(sql, options).await.unwrap();
}

#[tokio::test]
async fn dml_output_schema() {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};

let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql(sql).await.unwrap();
let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]);
assert_eq!(Schema::from(df.schema()), count_schema);
}

#[tokio::test]
async fn unsupported_copy_returns_error() {
let tmpdir = TempDir::new().unwrap();
Expand Down
Loading

0 comments on commit 00d03f3

Please sign in to comment.