Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,506 changes: 780 additions & 726 deletions Cargo.lock

Large diffs are not rendered by default.

60 changes: 28 additions & 32 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
members = ["clade"]

[workspace.dependencies]
arrow = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = "50.0.0"
arrow = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-flight = "51.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "50.0.0"
arrow-schema = "50.0.0"
arrow-integration-test = "51.0.0"
arrow-schema = "51.0.0"
async-trait = "0.1.64"

datafusion = "36.0.0"
datafusion-common = "36.0.0"
datafusion-expr = "36.0.0"
datafusion = "37.1.0"
datafusion-common = "37.1.0"
datafusion-expr = "37.1.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -51,20 +51,16 @@ object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[patch.crates-io]
# Pick up https://github.com/apache/arrow-rs/pull/5282
arrow-arith = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-csv = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-data = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-json = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ord = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-row = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-select = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-string = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
# Pick up backport of https://github.com/apache/arrow-datafusion/pull/10114
datafusion = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-common = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-execution = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-optimizer = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-physical-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-physical-plan = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-proto = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-sql = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }

[dependencies]
arrow = { workspace = true }
Expand All @@ -80,14 +76,14 @@ async-trait = { workspace = true }
base64 = "0.21.0"

bytes = "1.4.0"
chrono = { version = "0.4", default_features = false }
chrono = { version = "0.4", default-features = false }
clade = { path = "clade" }
clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }

dashmap = "5.4.0"

Expand All @@ -97,16 +93,16 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c223bb66dd518fe2f7a6d5ba29e67267aaf95876", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "28ad3950d90573fa8ff413c336b657b8561e1d41", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = { workspace = true }
lazy_static = ">=1.4.0"
metrics = { version = "0.22.1" }
metrics-exporter-prometheus = { version = "0.13.1" }
moka = { version = "0.12.5", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.9"
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.9.1"
percent-encoding = "2.2.0"
prost = "0.12.1"

Expand All @@ -122,15 +118,15 @@ rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = { version = "0.43", features = ["visitor"] }
sqlparser = { version = "0.44", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "1"
tokio = { workspace = true }
tokio-graceful-shutdown = { version = "0.14" }
tonic = { version = "0.10.0", optional = true }
tonic = { version = "0.11.0", optional = true }
tower = "0.4"
tracing = { workspace = true }
tracing-log = "0.2"
Expand All @@ -152,7 +148,7 @@ aws-credential-types = { version = "1.1.5", features = ["hardcoded-credentials"]
aws-sdk-sts = { version = "1.3.1", features = ["behavior-version-latest"] }
rstest = "*"
serial_test = "2"
tonic-reflection = "0.10"
tonic-reflection = "0.11"
wiremock = "0.5"

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion clade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
prost = "0.12"
tonic = "0.10"
tonic = "0.11"

[build-dependencies]
tonic-build = "0.10"
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-36-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-37.1-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions datafusion_remote_tables/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::temporal_conversions::{
use datafusion::common::{Column, DataFusionError};
use datafusion::error::Result;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use itertools::Itertools;
Expand Down Expand Up @@ -109,9 +109,9 @@ pub trait FilterPushdownConverter {
}

impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
type N = Expr;
type Node = Expr;

fn pre_visit(&mut self, expr: &Expr) -> Result<VisitRecursion> {
fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
match expr {
Expr::Column(_)
| Expr::Literal(_)
Expand Down Expand Up @@ -140,10 +140,10 @@ impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
)));
}
};
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}

fn post_visit(&mut self, expr: &Expr) -> Result<VisitRecursion> {
fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
match expr {
// Column and Literal are the only two leaf nodes atm - they don't depend on any SQL
// expression being on the stack.
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
}
_ => {}
};
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datafusion::{
error::DataFusionError,
execution::context::TaskContext,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
physical_plan::ExecutionPlan,
physical_plan::{ExecutionPlan, ExecutionPlanProperties},
sql::TableReference,
};
use deltalake::kernel::{Action, Add, Schema as DeltaSchema};
Expand All @@ -29,6 +29,7 @@ use object_store::path::Path;
use std::fs::File;
use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};

use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Semaphore;
Expand Down
1 change: 1 addition & 0 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ impl SeafowlContext {
source: CopyToSource::Relation(table_name),
target,
options,
..
}) if options.contains(&CONVERT_TO_DELTA) => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable {
Expand Down
37 changes: 21 additions & 16 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::{
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
sql::TableReference,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column as ColumnExpr, ResolvedTableReference, SchemaReference};
use datafusion_expr::logical_plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
Expand Down Expand Up @@ -279,21 +279,26 @@ impl SeafowlContext {
// are qualified thanks to https://github.com/apache/arrow-datafusion/pull/7316
//
// This leads to a panic unless we strip out the qualifier first.
filter.push(predicate.clone().transform(&|expr| {
Ok(
if let Expr::Column(ColumnExpr {
relation: Some(_),
name,
}) = &expr
{
Transformed::Yes(Expr::Column(
ColumnExpr::new_unqualified(name),
))
} else {
Transformed::No(expr)
},
)
})?);
filter.push(
predicate
.clone()
.transform(&|expr| {
Ok(
if let Expr::Column(ColumnExpr {
relation: Some(_),
name,
}) = &expr
{
Transformed::yes(Expr::Column(
ColumnExpr::new_unqualified(name),
))
} else {
Transformed::no(expr)
},
)
})
.data()?,
);

// A WHERE clause has been used; employ it to prune the update down to only
// a subset of files, while inheriting the rest from the previous version
Expand Down
7 changes: 7 additions & 0 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ impl<'a> DFParser<'a> {
source: CopyToSource::Relation(table_name),
target: location,
options: vec![CONVERT_TO_DELTA.clone()],
partitioned_by: vec![],
has_header: false,

stored_as: None,
}))
}

Expand Down Expand Up @@ -238,6 +242,9 @@ impl<'a> DFParser<'a> {
source,
target,
options,
partitioned_by: vec![],
has_header: false,
stored_as: None,
}))
}

Expand Down
28 changes: 13 additions & 15 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Instant;
use std::{net::SocketAddr, sync::Arc};
use warp::{hyper, Rejection};

use arrow::json::writer::record_batches_to_json_rows;
use arrow::json::writer::{LineDelimited, WriterBuilder};
use arrow::record_batch::RecordBatch;
#[cfg(feature = "frontend-arrow-flight")]
use arrow_flight::flight_service_client::FlightServiceClient;
Expand All @@ -18,7 +18,7 @@ use bytes::Buf;
use datafusion::datasource::DefaultTableSource;

use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::FileType;
use datafusion_expr::logical_plan::{LogicalPlan, TableScan};
use deltalake::parquet::data_type::AsBytes;
Expand Down Expand Up @@ -68,12 +68,12 @@ struct ETagBuilderVisitor {
}

impl TreeNodeVisitor for ETagBuilderVisitor {
type N = LogicalPlan;
type Node = LogicalPlan;

fn pre_visit(
fn f_down(
&mut self,
plan: &LogicalPlan,
) -> Result<VisitRecursion, DataFusionError> {
) -> Result<TreeNodeRecursion, DataFusionError> {
if let LogicalPlan::TableScan(TableScan { source, .. }) = plan {
// TODO handle external Parquet tables too
if let Some(default_table_source) =
Expand All @@ -91,7 +91,7 @@ impl TreeNodeVisitor for ETagBuilderVisitor {
}
}
}
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}

Expand Down Expand Up @@ -130,14 +130,12 @@ struct QueryBody {
fn batch_to_json(
maybe_batch: Result<RecordBatch, DataFusionError>,
) -> Result<Vec<u8>, ArrowError> {
let batch = maybe_batch?;
let mut buf = Vec::new();
for row in record_batches_to_json_rows(&[&maybe_batch?])? {
buf.extend(
serde_json::to_vec(&row)
.map_err(|error| ArrowError::JsonError(error.to_string()))?,
);
buf.push(b'\n');
}
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, LineDelimited>(&mut buf);
writer.write(&batch)?;
Ok(buf)
}

Expand Down Expand Up @@ -1220,7 +1218,7 @@ pub mod tests {
let error_msg = String::from_utf8_lossy(resp.body());
assert_eq!(
error_msg,
"Json error: data type Decimal128(38, 10) not supported in nested map for json writer"
"Invalid argument error: JSON Writer does not support data type: Decimal128(38, 10)"
);
}

Expand Down Expand Up @@ -1681,7 +1679,7 @@ SELECT
assert_eq!(
resp.body(),
&Bytes::from(
r#"{"bigint_val":1000000000,"bool_val":true,"char_val":"c","date_val":"2022-01-01","double_val":12.345678910111213,"float_val":12.345,"int_array_val":[1,2,3,4,5],"integer_val":1000000,"real_val":12.345,"smallint_val":1000,"string_val":"string","text_array_val":["one","two"],"text_val":"text","timestamp_val":"2022-01-01T12:03:11.123456","tinyint_val":1,"varchar_val":"varchar"}
r#"{"tinyint_val":1,"smallint_val":1000,"integer_val":1000000,"bigint_val":1000000000,"char_val":"c","varchar_val":"varchar","text_val":"text","string_val":"string","float_val":12.345,"real_val":12.345,"double_val":12.345678910111213,"bool_val":true,"date_val":"2022-01-01","timestamp_val":"2022-01-01T12:03:11.123456","int_array_val":[1,2,3,4,5],"text_array_val":["one","two"]}
"#
)
);
Expand Down
Loading