Skip to content

Commit

Permalink
Non-windowed updating aggregates using datafusion.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Apr 12, 2024
1 parent 15ebc97 commit beec1f6
Show file tree
Hide file tree
Showing 29 changed files with 1,328 additions and 189 deletions.
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/json'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.9.0/put_part_api'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
1 change: 1 addition & 0 deletions crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub enum OperatorName {
TumblingWindowAggregate,
SlidingWindowAggregate,
SessionWindowAggregate,
UpdatingAggregate,
ConnectorSource,
ConnectorSink,
}
Expand Down
20 changes: 15 additions & 5 deletions crates/arroyo-df/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
use petgraph::graph::{DiGraph, NodeIndex};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tracing::info;

use crate::extension::debezium::{DEBEZIUM_UNROLLING_EXTENSION_NAME, TO_DEBEZIUM_EXTENSION_NAME};
use crate::extension::key_calculation::KeyCalculationExtension;
Expand Down Expand Up @@ -124,6 +125,7 @@ impl<'a> Planner<'a> {
&self,
key_indices: Vec<usize>,
aggregate: &LogicalPlan,
add_timestamp_field: bool,
) -> DFResult<SplitPlanOutput> {
let physical_plan = self.sync_plan(aggregate)?;
let codec = ArroyoPhysicalExtensionCodec {
Expand Down Expand Up @@ -173,11 +175,18 @@ impl<'a> Planner<'a> {
physical_plan_type: Some(PhysicalPlanType::Aggregate(final_aggregate_proto)),
};

let partial_schema = ArroyoSchema::new_keyed(
add_timestamp_field_arrow(partial_schema.clone()),
partial_schema.fields().len(),
key_indices,
);
info!("partial schema begins as :{:?}", partial_schema);

let (partial_schema, timestamp_index) = if add_timestamp_field {
(
add_timestamp_field_arrow(partial_schema.clone()),
partial_schema.fields().len(),
)
} else {
(partial_schema.clone(), partial_schema.fields().len() - 1)
};

let partial_schema = ArroyoSchema::new_keyed(partial_schema, timestamp_index, key_indices);

Ok(SplitPlanOutput {
partial_aggregation_plan,
Expand Down Expand Up @@ -362,6 +371,7 @@ impl<'a> TreeNodeVisitor for PlanToGraphVisitor<'a> {
} else {
vec![]
};
info!("building node: {:?}", node);
self.build_extension(input_nodes, arroyo_extension)
.map_err(|e| DataFusionError::Plan(format!("error building extension: {}", e)))?;

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-df/src/extension/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl AggregateExtension {
partial_aggregation_plan,
partial_schema,
finish_plan,
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate)?;
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate, true)?;

let final_physical_plan = planner.sync_plan(&self.final_calculation)?;
let final_physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
Expand Down Expand Up @@ -126,7 +126,7 @@ impl AggregateExtension {
partial_aggregation_plan,
partial_schema,
finish_plan,
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate)?;
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate, true)?;

let final_physical_plan = planner.sync_plan(&self.final_calculation)?;
let final_physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
Expand Down Expand Up @@ -251,7 +251,7 @@ impl AggregateExtension {
partial_aggregation_plan,
partial_schema,
finish_plan,
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate)?;
} = planner.split_physical_plan(self.key_fields.clone(), &self.aggregate, true)?;

let config = TumblingWindowAggregateOperator {
name: "InstantWindow".to_string(),
Expand Down
68 changes: 67 additions & 1 deletion crates/arroyo-df/src/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::sync::Arc;

use anyhow::Result;
use arrow_schema::{DataType, TimeUnit};
use arroyo_datastream::logical::{LogicalEdge, LogicalNode};
use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef};
use datafusion_common::{DFSchemaRef, DataFusionError, OwnedTableReference, Result as DFResult};
use arroyo_rpc::{IS_RETRACT_FIELD, TIMESTAMP_FIELD};
use datafusion_common::{
DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result as DFResult,
};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore};
use watermark_node::WatermarkNode;

Expand All @@ -15,6 +19,7 @@ use self::debezium::{
DebeziumUnrollingExtension, ToDebeziumExtension, DEBEZIUM_UNROLLING_EXTENSION_NAME,
TO_DEBEZIUM_EXTENSION_NAME,
};
use self::updating_aggregate::{UpdatingAggregateExtension, UPDATING_AGGREGATE_EXTENSION_NAME};
use self::{
aggregate::{AggregateExtension, AGGREGATE_EXTENSION_NAME},
join::JOIN_NODE_NAME,
Expand All @@ -33,6 +38,7 @@ pub(crate) mod key_calculation;
pub(crate) mod remote_table;
pub(crate) mod sink;
pub(crate) mod table_source;
pub(crate) mod updating_aggregate;
pub(crate) mod watermark_node;
pub(crate) mod window_fn;
pub(crate) trait ArroyoExtension {
Expand Down Expand Up @@ -117,6 +123,13 @@ impl<'a> TryFrom<&'a dyn UserDefinedLogicalNode> for &'a dyn ArroyoExtension {
node.as_any().downcast_ref::<ToDebeziumExtension>().unwrap();
Ok(to_debezium_extension as &dyn ArroyoExtension)
}
UPDATING_AGGREGATE_EXTENSION_NAME => {
let updating_aggregate_extension = node
.as_any()
.downcast_ref::<UpdatingAggregateExtension>()
.unwrap();
Ok(updating_aggregate_extension as &dyn ArroyoExtension)
}
other => Err(DataFusionError::Plan(format!("unexpected node: {}", other))),
}
}
Expand Down Expand Up @@ -186,3 +199,56 @@ impl UserDefinedLogicalNodeCore for TimestampAppendExtension {
Self::new(inputs[0].clone(), self.qualifier.clone())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct IsRetractExtension {
pub(crate) input: LogicalPlan,
pub(crate) schema: DFSchemaRef,
}

impl IsRetractExtension {
pub(crate) fn new(input: LogicalPlan) -> Self {
let mut output_fields = input.schema().fields().clone();
let timestamp_index = output_fields.len() - 1;
output_fields[timestamp_index] = DFField::new_unqualified(
TIMESTAMP_FIELD,
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
);
output_fields.push(DFField::new_unqualified(
IS_RETRACT_FIELD,
DataType::Boolean,
false,
));
let schema = Arc::new(
DFSchema::new_with_metadata(output_fields, input.schema().metadata().clone()).unwrap(),
);
Self { input, schema }
}
}

impl UserDefinedLogicalNodeCore for IsRetractExtension {
fn name(&self) -> &str {
"IsRetractExtension"
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &DFSchemaRef {
&self.schema
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "IsRetractExtension")
}

fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
Self::new(inputs[0].clone())
}
}
Loading

0 comments on commit beec1f6

Please sign in to comment.