Skip to content

Commit

Permalink
Add type relation
Browse files Browse the repository at this point in the history
  • Loading branch information
notfilippo committed Jul 2, 2024
1 parent 5427f4b commit 996bbdc
Show file tree
Hide file tree
Showing 105 changed files with 1,446 additions and 1,688 deletions.
10 changes: 6 additions & 4 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

/// This example demonstrates using low level DataFusion APIs to read only
/// certain row groups and ranges from parquet files, based on external
Expand Down Expand Up @@ -299,8 +300,9 @@ impl IndexTableProvider {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?;
PruningPredicate::try_new(Arc::clone(predicate), schema)?;

// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down Expand Up @@ -453,8 +455,8 @@ impl TableProvider for IndexTableProvider {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.indexed_file.schema)
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.indexed_file.schema.as_ref().clone().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -482,7 +484,7 @@ impl TableProvider for IndexTableProvider {
.with_extensions(Arc::new(access_plan) as _);

// Prepare for scanning
let schema = self.schema();
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let object_store_url = ObjectStoreUrl::parse("file://")?;
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
Expand Down
15 changes: 9 additions & 6 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
Expand All @@ -38,6 +38,8 @@ use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use tokio::time::timeout;
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::schema::{LogicalSchema, LogicalSchemaRef};

/// This example demonstrates executing a simple query against a custom datasource
#[tokio::main]
Expand Down Expand Up @@ -162,10 +164,10 @@ impl TableProvider for CustomDataSource {
self
}

fn schema(&self) -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("id", DataType::UInt8, false),
Field::new("bank_account", DataType::UInt64, true),
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(LogicalSchema::new(vec![
LogicalField::new("id", DataType::UInt8, false),
LogicalField::new("bank_account", DataType::UInt64, true),
]))
}

Expand All @@ -181,7 +183,8 @@ impl TableProvider for CustomDataSource {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
return self.create_physical_plan(projection, self.schema()).await;
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
return self.create_physical_plan(projection, schema).await;
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/dataframe_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::logical_type::LogicalType;
use std::sync::Arc;

use arrow_schema::DataType;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::prelude::*;
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
scalar_subquery(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.aggregate(vec![], vec![avg(col("t2.c2"))])?
.select(vec![avg(col("t2.c2"))])?
.into_unoptimized_plan(),
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.select(vec![col("t2.c2")])?
.into_unoptimized_plan(),
)))?
Expand Down
11 changes: 5 additions & 6 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::schema::LogicalSchema;
use datafusion_common::logical_type::LogicalType;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
Expand Down Expand Up @@ -214,7 +213,7 @@ fn simplify_demo() -> Result<()> {
// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&LogicalType::Date32, &schema)?)?,
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32.into(), &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Expand Down Expand Up @@ -296,14 +295,14 @@ fn expression_type_demo() -> Result<()> {
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::from_unqualified_fields(
vec![LogicalField::new("c", LogicalType::Utf8, true)].into(),
vec![LogicalField::new("c", DataType::Utf8, true)].into(),
HashMap::new(),
)?;
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::from_unqualified_fields(
vec![LogicalField::new("c", LogicalType::Int32, true)].into(),
vec![LogicalField::new("c", DataType::Int32, true)].into(),
HashMap::new(),
)?;
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));
Expand All @@ -313,8 +312,8 @@ fn expression_type_demo() -> Result<()> {
let expr = col("c1") + col("c2");
let schema = DFSchema::from_unqualified_fields(
vec![
LogicalField::new("c1", LogicalType::Int32, true),
LogicalField::new("c2", LogicalType::Float32, true),
LogicalField::new("c1", DataType::Int32, true),
LogicalField::new("c2", DataType::Float32, true),
]
.into(),
HashMap::new(),
Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::logical_type::extension::ExtensionType;
use datafusion_common::logical_type::ExtensionType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
Expand Down Expand Up @@ -218,13 +218,14 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
return_type: definition
.return_type
.expect("Return type has to be defined!")
.physical_type(),
.physical()
.clone(),
signature: Signature::exact(
definition
.args
.unwrap_or_default()
.into_iter()
.map(|a| a.data_type.physical_type())
.map(|a| a.data_type.physical().clone())
.collect(),
definition
.params
Expand Down
3 changes: 3 additions & 0 deletions datafusion-examples/examples/logical_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {

}
8 changes: 5 additions & 3 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use std::sync::{
};
use tempfile::TempDir;
use url::Url;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

/// This example demonstrates building a secondary index over multiple Parquet
/// files and using that index during query to skip ("prune") files that do not
Expand Down Expand Up @@ -212,8 +213,8 @@ impl TableProvider for IndexTableProvider {
self
}

fn schema(&self) -> SchemaRef {
self.index.schema().clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.index.schema().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -243,7 +244,8 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let mut file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_projection(projection.cloned())
.with_limit(limit);

Expand Down
9 changes: 5 additions & 4 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::logical_type::LogicalType;
use datafusion_common::logical_type::TypeRelation;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
Expand All @@ -32,6 +32,7 @@ use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;
use std::any::Any;
use std::sync::Arc;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

pub fn main() -> Result<()> {
// produce a logical plan using the datafusion-sql crate
Expand Down Expand Up @@ -212,7 +213,7 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<LogicalType> {
fn get_variable_type(&self, _variable_names: &[String]) -> Option<TypeRelation> {
None
}

Expand Down Expand Up @@ -246,7 +247,7 @@ impl TableSource for MyTableSource {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.schema.as_ref().clone().into())
}
}
8 changes: 4 additions & 4 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::fs::File;
use std::io::Seek;
use std::path::Path;
use std::sync::Arc;

use datafusion_common::logical_type::schema::LogicalSchemaRef;
// To define your own table function, you only need to do the following 3 things:
// 1. Implement your own [`TableProvider`]
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
Expand Down Expand Up @@ -85,8 +85,8 @@ impl TableProvider for LocalCsvTable {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.schema.clone().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -121,7 +121,7 @@ impl TableProvider for LocalCsvTable {
};
Ok(Arc::new(MemoryExec::try_new(
&[batches],
TableProvider::schema(self),
self.schema.clone(),
projection.cloned(),
)?))
}
Expand Down
29 changes: 14 additions & 15 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ use crate::{

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use crate::logical_type::extension::ExtensionType;
use crate::logical_type::field::{LogicalField, LogicalFieldRef};
use crate::logical_type::fields::LogicalFields;
use crate::logical_type::LogicalType;
use crate::logical_type::{TypeRelation, ExtensionType};
use crate::logical_type::schema::{LogicalSchema, LogicalSchemaBuilder, LogicalSchemaRef};

/// A reference-counted reference to a [DFSchema].
Expand Down Expand Up @@ -165,7 +164,7 @@ impl DFSchema {
/// Create a new `DFSchema` from a list of Arrow [Field]s
#[allow(deprecated)]
pub fn from_unqualified_fields(
fields: Fields,
fields: LogicalFields,
metadata: HashMap<String, String>,
) -> Result<Self> {
Self::from_unqualifed_fields(fields, metadata)
Expand Down Expand Up @@ -675,15 +674,15 @@ impl DFSchema {
self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
q1 == q2
&& f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(&f1.data_type().physical_type(), &f2.data_type().physical_type())
&& Self::datatype_is_semantically_equal(&f1.data_type().physical(), &f2.data_type().physical())
})
}

/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
/// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
/// logically equal to Dictionary<K2, V1>.
pub fn datatype_is_logically_equal(dt1: &LogicalType, dt2: &LogicalType) -> bool {
pub fn datatype_is_logically_equal(dt1: &TypeRelation, dt2: &TypeRelation) -> bool {
dt1 == dt2
}

Expand Down Expand Up @@ -916,13 +915,13 @@ pub trait ExprSchema: std::fmt::Debug {
fn nullable(&self, col: &Column) -> Result<bool>;

/// What is the datatype of this column?
fn data_type(&self, col: &Column) -> Result<&LogicalType>;
fn data_type(&self, col: &Column) -> Result<&TypeRelation>;

/// Returns the column's optional metadata.
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;

/// Return the coulmn's datatype and nullability
fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)>;
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -931,15 +930,15 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
self.as_ref().nullable(col)
}

fn data_type(&self, col: &Column) -> Result<&LogicalType> {
fn data_type(&self, col: &Column) -> Result<&TypeRelation> {
self.as_ref().data_type(col)
}

fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
ExprSchema::metadata(self.as_ref(), col)
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)> {
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)> {
self.as_ref().data_type_and_nullable(col)
}
}
Expand All @@ -949,15 +948,15 @@ impl ExprSchema for DFSchema {
Ok(self.field_from_column(col)?.is_nullable())
}

fn data_type(&self, col: &Column) -> Result<&LogicalType> {
fn data_type(&self, col: &Column) -> Result<&TypeRelation> {
Ok(self.field_from_column(col)?.data_type())
}

fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)> {
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)> {
let field = self.field_from_column(col)?;
Ok((field.data_type(), field.is_nullable()))
}
Expand Down Expand Up @@ -1080,8 +1079,8 @@ mod tests {
let schema = DFSchema::from_field_specific_qualified_schema(
vec![Some("t1".into()), None],
&Arc::new(LogicalSchema::new(vec![
LogicalField::new("c0", LogicalType::Boolean, true),
LogicalField::new("c1", LogicalType::Boolean, true),
LogicalField::new("c0", DataType::Boolean, true),
LogicalField::new("c1", DataType::Boolean, true),
])),
)?;
assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
Expand Down Expand Up @@ -1333,8 +1332,8 @@ mod tests {

fn test_schema_2() -> LogicalSchema {
LogicalSchema::new(vec![
LogicalField::new("c100", LogicalType::Boolean, true),
LogicalField::new("c101", LogicalType::Boolean, true),
LogicalField::new("c100", DataType::Boolean, true),
LogicalField::new("c101", DataType::Boolean, true),
])
}

Expand Down
Loading

0 comments on commit 996bbdc

Please sign in to comment.