Skip to content

Commit

Permalink
[sql] Use JSON contrib functions (#2540)
Browse files Browse the repository at this point in the history
This commit adds few common JSON functions as UDFs,
to use the latest version, this commit also updates datafusion to 0.44
  • Loading branch information
igalshilman authored Jan 24, 2025
1 parent edd8921 commit 94be2a4
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 111 deletions.
357 changes: 271 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ cling = { version = "0.1", default-features = false, features = ["derive"] }
criterion = "0.5"
crossterm = { version = "0.27.0" }
dashmap = { version = "6" }
datafusion = { version = "42.0.0", default-features = false, features = [
datafusion = { version = "44.0.0", default-features = false, features = [
"crypto_expressions",
"encoding_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-expr = { version = "42.0.0" }
datafusion-expr = { version = "44.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
dialoguer = { version = "0.11.0" }
Expand Down Expand Up @@ -237,3 +237,8 @@ strip = true # Automatically strip symbols from the binary.
[profile.bench]
# Should be enabled for benchmarking runs; increases binary size
debug = true

[profile.dev.package.tikv-jemalloc-sys]
opt-level = 2


1 change: 1 addition & 0 deletions crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bytestring = { workspace = true }
chrono = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
datafusion-functions-json = { version = "0.44.1" }
derive_more = { workspace = true }
futures = { workspace = true }
paste = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/src/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use datafusion::logical_expr::{Join, LogicalPlan};
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::prelude::Expr;

#[derive(Debug)]
pub(crate) struct UseSymmetricHashJoinWhenPartitionKeyIsPresent;

impl UseSymmetricHashJoinWhenPartitionKeyIsPresent {
Expand Down
20 changes: 15 additions & 5 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ use codederror::CodedError;
use datafusion::catalog::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SQLOptions;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::TableReference;

use restate_core::Metadata;
use restate_invoker_api::StatusHandle;
use restate_partition_store::PartitionStoreManager;
Expand All @@ -34,6 +33,7 @@ use restate_types::live::Live;
use restate_types::partition_table::Partition;
use restate_types::schema::deployment::DeploymentResolver;
use restate_types::schema::service::ServiceMetadataResolver;
use tracing::warn;

use crate::remote_query_scanner_manager::RemoteScannerManager;
use crate::table_providers::ScanPartition;
Expand Down Expand Up @@ -224,11 +224,13 @@ impl QueryContext {
//
// build the runtime
//
let mut runtime_config = RuntimeConfig::default().with_memory_limit(memory_limit, 1.0);
let mut runtime_config = RuntimeEnvBuilder::default();
runtime_config = runtime_config.with_memory_limit(memory_limit, 1.0);

if let Some(folder) = temp_folder {
runtime_config = runtime_config.with_temp_file_path(folder);
}
let runtime = Arc::new(RuntimeEnv::new(runtime_config).expect("runtime"));
let runtime = runtime_config.build_arc().expect("runtime");
//
// build the session
//
Expand Down Expand Up @@ -290,7 +292,15 @@ impl QueryContext {
state_builder.with_physical_optimizer_rules(default_physical_optimizer_rules);

let state = state_builder.build();
let ctx = SessionContext::new_with_state(state);

let mut ctx = SessionContext::new_with_state(state);

match datafusion_functions_json::register_all(&mut ctx) {
Ok(_) => {}
Err(err) => {
warn!("Unable to register json functions {}", err);
}
};

let sql_options = SQLOptions::new()
.with_allow_ddl(false)
Expand Down
11 changes: 10 additions & 1 deletion crates/storage-query-datafusion/src/partition_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use datafusion::common::ScalarValue;
use datafusion::logical_expr::{col, BinaryExpr, Expr, Operator};
use restate_types::identifiers::partitioner::HashPartitioner;
use restate_types::identifiers::{InvocationId, PartitionKey, WithPartitionKey};
use std::fmt::{Debug, Formatter};
use std::str::FromStr;

pub trait PartitionKeyExtractor: Send + Sync + 'static {
pub trait PartitionKeyExtractor: Send + Sync + 'static + Debug {
fn try_extract(&self, filters: &[Expr]) -> anyhow::Result<Option<PartitionKey>>;
}

#[derive(Debug)]
pub struct FirstMatchingPartitionKeyExtractor {
extractors: Vec<Box<dyn PartitionKeyExtractor>>,
}
Expand Down Expand Up @@ -72,6 +74,12 @@ pub(crate) struct MatchingColumnExtractor<F> {
extractor: F,
}

impl<F> Debug for MatchingColumnExtractor<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("MatchingColumnExtractor{:?}", self.column))
}
}

impl<F> MatchingColumnExtractor<F> {
pub(crate) fn new(column_name: impl Into<String>, extractor: F) -> Self {
let column = col(column_name.into());
Expand Down Expand Up @@ -103,6 +111,7 @@ where
}
}

#[derive(Debug)]
struct IdentityPartitionKeyExtractor(Expr);

impl IdentityPartitionKeyExtractor {
Expand Down
13 changes: 3 additions & 10 deletions crates/storage-query-datafusion/src/physical_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion::physical_plan::joins::{
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct JoinRewrite;

impl JoinRewrite {
Expand Down Expand Up @@ -60,16 +61,8 @@ impl PhysicalOptimizerRule for JoinRewrite {
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.null_equals_null(),
hash_join
.left()
.properties()
.output_ordering()
.map(|s| s.to_vec()),
hash_join
.right()
.properties()
.output_ordering()
.map(|s| s.to_vec()),
hash_join.left().properties().output_ordering().cloned(),
hash_join.right().properties().output_ordering().cloned(),
StreamJoinPartitionMode::Partitioned,
) else {
return Ok(Transformed::no(plan));
Expand Down
12 changes: 9 additions & 3 deletions crates/storage-query-datafusion/src/table_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream,
};
use restate_types::identifiers::{PartitionId, PartitionKey};
Expand All @@ -39,6 +40,7 @@ pub trait ScanPartition: Send + Sync + Debug + 'static {
) -> anyhow::Result<SendableRecordBatchStream>;
}

#[derive(Debug)]
pub(crate) struct PartitionedTableProvider<T, S> {
partition_selector: S,
schema: SchemaRef,
Expand Down Expand Up @@ -136,7 +138,8 @@ where
let plan = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(required_partitions.len()),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
);

Ok(Arc::new(PartitionedExecutionPlan {
Expand Down Expand Up @@ -248,6 +251,8 @@ pub(crate) trait Scan: Debug + Send + Sync + 'static {
}

pub(crate) type ScannerRef = Arc<dyn Scan>;

#[derive(Debug)]
pub(crate) struct GenericTableProvider {
schema: SchemaRef,
scanner: ScannerRef,
Expand Down Expand Up @@ -327,7 +332,8 @@ impl GenericExecutionPlan {
let plan_properties = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
);

Self {
Expand Down
8 changes: 4 additions & 4 deletions crates/storage-query-datafusion/src/table_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_expr::expressions::col;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use std::fmt::Write;
use tracing::error;

Expand All @@ -28,11 +28,11 @@ macro_rules! log_data_corruption_error {
};
}

pub(crate) fn compute_ordering(schema: SchemaRef) -> Option<Vec<PhysicalSortExpr>> {
let ordering = vec![PhysicalSortExpr {
pub(crate) fn compute_ordering(schema: SchemaRef) -> Option<LexOrdering> {
let ordering = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("partition_key", &schema).ok()?,
options: Default::default(),
}];
}]);

Some(ordering)
}
Expand Down

0 comments on commit 94be2a4

Please sign in to comment.