Skip to content

Commit

Permalink
move unpacking source projection to QueryToGraphVistor's mutate().
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Jan 26, 2024
1 parent 9ba2fe1 commit bc387c4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 109 deletions.
54 changes: 28 additions & 26 deletions arroyo-df/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,23 +384,19 @@ impl ArroyoSchemaProvider {
.get_table(name)
.ok_or_else(|| DataFusionError::Plan(format!("Table {} not found", name)))?;

let mut table_fields = vec![];
let fields_from_table = table.get_fields();
for field in fields {
let t_field = fields_from_table
.iter()
.find(|t_field| t_field.name() == field.name())
.ok_or_else(|| {
return DataFusionError::Plan(format!(
"Field {} not found in table {}",
field.name(),
name
));
})?;
table_fields.push(t_field.clone());
}
let fields = table
.get_fields()
.iter()
.filter_map(|field| {
if fields.contains(field) {
Some(field.clone())
} else {
None
}
})
.collect::<Vec<_>>();

let schema = Arc::new(Schema::new_with_metadata(table_fields, HashMap::new()));
let schema = Arc::new(Schema::new_with_metadata(fields, HashMap::new()));
Ok(create_table(name.to_string(), schema))
}
}
Expand Down Expand Up @@ -593,7 +589,7 @@ impl LogicalPlanExtension {
} => Some(inner_plan),
LogicalPlanExtension::AggregateCalculation(_) => None,
LogicalPlanExtension::Sink { .. } => None,
LogicalPlanExtension::WatermarkNode(n) => Some(&n.input),
LogicalPlanExtension::WatermarkNode(n) => None,
}
}

Expand Down Expand Up @@ -957,8 +953,7 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
LogicalPlan::TableScan(table_scan) => {
if table_scan.projection.is_some() {
return Err(DataFusionError::Internal(
"Unexpected projection in table scan that should have been removed by SourceRewriter"
.to_string(),
"Unexpected projection in table scan".to_string(),
));
}

Expand Down Expand Up @@ -994,17 +989,23 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
Ok(interred_plan.clone())
}
LogicalPlan::Extension(extension) => {
let Some(watermark_node) = extension.node.as_any().downcast_ref::<WatermarkNode>()
else {
return Err(DataFusionError::Plan(
"Logical plan extension must be a watermark node".into(),
));
};
let watermark_node = extension
.node
.as_any()
.downcast_ref::<WatermarkNode>()
.unwrap()
.clone();

let index = self
.local_logical_plan_graph
.add_node(LogicalPlanExtension::WatermarkNode(watermark_node.clone()));

let input = LogicalPlanExtension::ValueCalculation(watermark_node.input);
let edge = input.outgoing_edge();
let input_index = self.local_logical_plan_graph.add_node(input);
self.local_logical_plan_graph
.add_edge(input_index, index, edge);

let table_name = format!("{}", index.index());

Ok(LogicalPlan::TableScan(TableScan {
Expand Down Expand Up @@ -1110,7 +1111,8 @@ pub async fn parse_and_get_arrow_program(
schema_provider: schema_provider.clone(),
})?
.rewrite(&mut TimestampRewriter {})?
.rewrite(&mut rewriter)?;
.rewrite(&mut rewriter)
.unwrap();

println!("REWRITE: {}", plan_rewrite.display_graphviz());

Expand Down
54 changes: 5 additions & 49 deletions arroyo-df/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::{
};
use crate::{tables::Table, ArroyoSchemaProvider, CompiledSql};
use anyhow::{anyhow, bail, Context, Result};
use arroyo_datastream::logical::{
LogicalEdge, LogicalEdgeType, LogicalGraph, LogicalNode, LogicalProgram, OperatorName,
};
use arroyo_datastream::logical::{LogicalGraph, LogicalNode, LogicalProgram, OperatorName};
use arroyo_rpc::grpc::api::{
KeyPlanOperator, SessionWindowAggregateOperator, SlidingWindowAggregateOperator,
ValuePlanOperator,
Expand All @@ -36,8 +34,6 @@ use arroyo_rpc::{
};
use datafusion_common::{DFSchema, DFSchemaRef, ScalarValue};
use datafusion_expr::{expr::ScalarFunction, BuiltinScalarFunction, Expr, LogicalPlan};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_proto::{
physical_plan::AsExecutionPlan,
protobuf::{
Expand Down Expand Up @@ -329,46 +325,14 @@ impl Planner {
sink_index
}
crate::LogicalPlanExtension::WatermarkNode(watermark_node) => {
let expression = create_physical_expr(
let expression = self.planner.create_physical_expr(
&watermark_node.watermark_expression.as_ref().unwrap(),
&watermark_node.schema,
&watermark_node.schema.as_ref().into(),
&ExecutionProps::new(),
&self.session_state,
)?;

let expression = PhysicalExprNode::try_from(expression)?;
let schema = ArroyoSchema {
schema: Arc::new(watermark_node.schema.as_ref().into()),
timestamp_index: watermark_node.schema.fields().len() - 1, // TODO: is this correct?
key_indices: vec![],
};

let projection_execution_plan = self
.planner
.create_physical_plan(&watermark_node.input.clone(), &self.session_state)
.await
.context("creating physical plan for watermark input value calculation")?;

let projection_physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
projection_execution_plan,
&ArroyoPhysicalExtensionCodec::default(),
)?;

let projection_node_config = ValuePlanOperator {
name: "proj".into(),
physical_plan: projection_physical_plan_node.encode_to_vec(),
};

let projection_node_index = program_graph.add_node(LogicalNode {
operator_id: format!("value_{}", program_graph.node_count()),
description: format!(
"pre_watermark_projection<{}>",
projection_node_config.name
),
operator_name: OperatorName::ArrowValue,
operator_config: projection_node_config.encode_to_vec(),
parallelism: 1,
});

let watermark_index = program_graph.add_node(LogicalNode {
operator_id: format!("watermark_{}", program_graph.node_count()),
Expand All @@ -379,21 +343,13 @@ impl Planner {
period_micros: 1_000_000,
idle_time_micros: None,
expression: expression.encode_to_vec(),
input_schema: Some(schema.clone().try_into()?),
input_schema: Some(watermark_node.arroyo_schema().try_into()?),
}
.encode_to_vec(),
});

let edge = LogicalEdge {
edge_type: LogicalEdgeType::Forward,
schema,
projection: None,
};

program_graph.add_edge(projection_node_index, watermark_index, edge);

node_mapping.insert(node_index, watermark_index);
projection_node_index
watermark_index
}
};

Expand Down
48 changes: 17 additions & 31 deletions arroyo-df/src/source_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,16 @@ impl SourceRewriter {
.ok_or_else(|| {
DataFusionError::Plan(format!("Watermark field {} not found", watermark_field))
})?,
None => {
// If no watermark field is present, calculate it as a fixed lateness duration of 1 second
Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "_timestamp".to_string(),
})),
op: datafusion_expr::Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::DurationNanosecond(Some(
Duration::from_secs(1).as_nanos() as i64,
)))),
})
}
None => Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "_timestamp".to_string(),
})),
op: datafusion_expr::Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::DurationNanosecond(Some(
Duration::from_secs(1).as_nanos() as i64,
)))),
}),
};
Ok(expr)
}
Expand Down Expand Up @@ -161,24 +158,13 @@ impl TreeNodeRewriter for SourceRewriter {
let Table::ConnectorTable(table) = table else {
return Ok(node);
};

let watermark_schema = DFSchema::try_from_qualified_schema(
table_scan.table_name.clone(),
&Schema::new(
table
.fields
.iter()
.map(|field| field.field().clone())
.collect::<Vec<_>>()
.clone(),
),
)?;

let watermark_node = WatermarkNode {
input: self.projection(&table_scan, table)?,
watermark_expression: Some(Self::watermark_expression(table)?),
schema: Arc::new(watermark_schema),
};
let watermark_node = WatermarkNode::new(
self.projection(&table_scan, table)?,
Some(Self::watermark_expression(table)?),
)
.map_err(|err| {
DataFusionError::Internal(format!("failed to create watermark expression: {}", err))
})?;

return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(watermark_node),
Expand Down
35 changes: 33 additions & 2 deletions arroyo-df/src/watermark_node.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::schemas::add_timestamp_field;
use anyhow::{anyhow, Result};
use arroyo_rpc::ArroyoSchema;
use datafusion_common::DFSchemaRef;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use std::fmt::Formatter;
use std::{fmt::Formatter, sync::Arc};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WatermarkNode {
pub input: LogicalPlan,
pub watermark_expression: Option<Expr>,
pub schema: DFSchemaRef,
timestamp_index: usize,
}

impl UserDefinedLogicalNodeCore for WatermarkNode {
Expand Down Expand Up @@ -38,11 +41,39 @@ impl UserDefinedLogicalNodeCore for WatermarkNode {
fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert_eq!(inputs.len(), 1, "input size inconsistent");
assert_eq!(exprs.len(), 1, "expression size inconsistent");
let schema = add_timestamp_field(self.input.schema().clone()).unwrap();
let timestamp_index = schema
.index_of_column_by_name(None, "_timestamp")
.unwrap()
.unwrap();

Self {
input: inputs[0].clone(),
watermark_expression: Some(exprs[0].clone()),
schema: add_timestamp_field(self.input.schema().clone()).unwrap(),
schema,
timestamp_index,
}
}
}

impl WatermarkNode {
pub(crate) fn new(input: LogicalPlan, watermark_expression: Option<Expr>) -> Result<Self> {
let schema = add_timestamp_field(input.schema().clone())?;
let timestamp_index = schema
.index_of_column_by_name(None, "_timestamp")?
.ok_or_else(|| anyhow!("missing _timestamp column"))?;
Ok(Self {
input,
watermark_expression,
schema,
timestamp_index,
})
}
pub(crate) fn arroyo_schema(&self) -> ArroyoSchema {
ArroyoSchema::new(
Arc::new(self.schema.as_ref().into()),
self.timestamp_index,
vec![],
)
}
}
2 changes: 1 addition & 1 deletion arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub const TIMESTAMP_FIELD: &str = "_timestamp";

pub type ArroyoSchemaRef = Arc<ArroyoSchema>;

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ArroyoSchema {
pub schema: Arc<Schema>,
pub timestamp_index: usize,
Expand Down

0 comments on commit bc387c4

Please sign in to comment.