Skip to content

Commit

Permalink
Generate watermarks and set event time using expressions
Browse files Browse the repository at this point in the history
Modify the periodic watermark generator to accept a physical expressions
that it uses to calculate the watermark. The expression can be set via a
virtual field in the create table statement, or it defaults to a max
lateness expression. The operator is inserted via a user-defined logical
node.

Add an expression to the projection in the SourceRewriter to set the
event time if present.
  • Loading branch information
jbeisen committed Jan 26, 2024
1 parent 94a0206 commit 59b5879
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 238 deletions.
2 changes: 1 addition & 1 deletion arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use strum::{Display, EnumString};

#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumString, Display)]
pub enum OperatorName {
Watermark,
ExpressionWatermark,
ArrowValue,
ArrowKey,
ArrowAggregate,
Expand Down
44 changes: 44 additions & 0 deletions arroyo-df/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use std::collections::HashSet;
use std::fmt::Debug;

use crate::types::{interval_month_day_nanos_to_duration, rust_to_arrow, NullableType};
use crate::watermark_node::WatermarkNode;
use arroyo_datastream::logical::{LogicalEdge, LogicalEdgeType, LogicalProgram};
use arroyo_rpc::{ArroyoSchema, TIMESTAMP_FIELD};
use std::time::{Duration, SystemTime};
Expand All @@ -65,6 +66,7 @@ const DEFAULT_IDLE_TIME: Option<Duration> = Some(Duration::from_secs(5 * 60));

#[cfg(test)]
mod test;
mod watermark_node;

#[allow(unused)]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -576,6 +578,7 @@ enum LogicalPlanExtension {
name: String,
connector_op: ConnectorOp,
},
WatermarkNode(WatermarkNode),
}

impl LogicalPlanExtension {
Expand All @@ -590,6 +593,7 @@ impl LogicalPlanExtension {
} => Some(inner_plan),
LogicalPlanExtension::AggregateCalculation(_) => None,
LogicalPlanExtension::Sink { .. } => None,
LogicalPlanExtension::WatermarkNode(n) => Some(&n.input),
}
}

Expand Down Expand Up @@ -628,6 +632,9 @@ impl LogicalPlanExtension {

DataFusionEdge::new(output_schema, LogicalEdgeType::Forward, vec![]).unwrap()
}
LogicalPlanExtension::WatermarkNode(n) => {
DataFusionEdge::new(n.schema.clone(), LogicalEdgeType::Forward, vec![]).unwrap()
}
LogicalPlanExtension::Sink { .. } => unreachable!(),
}
}
Expand Down Expand Up @@ -986,6 +993,43 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
};
Ok(interred_plan.clone())
}
LogicalPlan::Extension(extension) => {
let Some(watermark_node) = extension.node.as_any().downcast_ref::<WatermarkNode>()
else {
return Err(DataFusionError::Plan(
"Logical plan extension must be a watermark node".into(),
));
};

let index = self
.local_logical_plan_graph
.add_node(LogicalPlanExtension::WatermarkNode(watermark_node.clone()));

let table_name = format!("{}", index.index());

Ok(LogicalPlan::TableScan(TableScan {
table_name: OwnedTableReference::partial("arroyo-virtual", table_name.clone()),
source: create_table_with_timestamp(
OwnedTableReference::partial("arroyo-virtual", table_name).to_string(),
watermark_node
.schema
.fields()
.iter()
.map(|field| {
Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
))
})
.collect(),
),
projection: None,
projected_schema: watermark_node.schema.clone(),
filters: vec![],
fetch: None,
}))
}
other => Ok(other),
}
}
Expand Down
100 changes: 72 additions & 28 deletions arroyo-df/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::info;
use crate::{
physical::{ArroyoMemExec, ArroyoPhysicalExtensionCodec, DecodingContext, EmptyRegistry},
schemas::add_timestamp_field_arrow,
AggregateCalculation, DataFusionEdge, QueryToGraphVisitor,
AggregateCalculation, QueryToGraphVisitor,
};
use crate::{tables::Table, ArroyoSchemaProvider, CompiledSql};
use anyhow::{anyhow, bail, Context, Result};
Expand All @@ -36,6 +36,8 @@ use arroyo_rpc::{
};
use datafusion_common::{DFSchema, DFSchemaRef, ScalarValue};
use datafusion_expr::{expr::ScalarFunction, BuiltinScalarFunction, Expr, LogicalPlan};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_proto::{
physical_plan::AsExecutionPlan,
protobuf::{
Expand Down Expand Up @@ -112,33 +114,8 @@ impl Planner {
parallelism: 1,
});

let watermark_index = program_graph.add_node(LogicalNode {
operator_id: format!("watermark_{}", program_graph.node_count()),
description: "watermark".to_string(),
operator_name: OperatorName::Watermark,
parallelism: 1,
operator_config: api::PeriodicWatermark {
period_micros: 1_000_000,
max_lateness_micros: 0,
idle_time_micros: None,
}
.encode_to_vec(),
});

let mut edge: LogicalEdge = (&DataFusionEdge::new(
table_scan.projected_schema.clone(),
LogicalEdgeType::Forward,
vec![],
)
.unwrap())
.into();

edge.projection = table_scan.projection.clone();

program_graph.add_edge(source_index, watermark_index, edge);

node_mapping.insert(node_index, watermark_index);
watermark_index
node_mapping.insert(node_index, source_index);
source_index
}
crate::LogicalPlanExtension::ValueCalculation(logical_plan) => {
let _inputs = logical_plan.inputs();
Expand Down Expand Up @@ -351,6 +328,73 @@ impl Planner {
node_mapping.insert(node_index, sink_index);
sink_index
}
crate::LogicalPlanExtension::WatermarkNode(watermark_node) => {
let expression = create_physical_expr(
&watermark_node.watermark_expression.as_ref().unwrap(),
&watermark_node.schema,
&watermark_node.schema.as_ref().into(),
&ExecutionProps::new(),
)?;

let expression = PhysicalExprNode::try_from(expression)?;
let schema = ArroyoSchema {
schema: Arc::new(watermark_node.schema.as_ref().into()),
timestamp_index: watermark_node.schema.fields().len() - 1, // TODO: is this correct?
key_indices: vec![],
};

let projection_execution_plan = self
.planner
.create_physical_plan(&watermark_node.input.clone(), &self.session_state)
.await
.context("creating physical plan for watermark input value calculation")?;

let projection_physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
projection_execution_plan,
&ArroyoPhysicalExtensionCodec::default(),
)?;

let projection_node_config = ValuePlanOperator {
name: "proj".into(),
physical_plan: projection_physical_plan_node.encode_to_vec(),
};

let projection_node_index = program_graph.add_node(LogicalNode {
operator_id: format!("value_{}", program_graph.node_count()),
description: format!(
"pre_watermark_projection<{}>",
projection_node_config.name
),
operator_name: OperatorName::ArrowValue,
operator_config: projection_node_config.encode_to_vec(),
parallelism: 1,
});

let watermark_index = program_graph.add_node(LogicalNode {
operator_id: format!("watermark_{}", program_graph.node_count()),
description: "watermark".to_string(),
operator_name: OperatorName::ExpressionWatermark,
parallelism: 1,
operator_config: api::ExpressionWatermarkConfig {
period_micros: 1_000_000,
idle_time_micros: None,
expression: expression.encode_to_vec(),
input_schema: Some(schema.clone().try_into()?),
}
.encode_to_vec(),
});

let edge = LogicalEdge {
edge_type: LogicalEdgeType::Forward,
schema,
projection: None,
};

program_graph.add_edge(projection_node_index, watermark_index, edge);

node_mapping.insert(node_index, watermark_index);
projection_node_index
}
};

for edge in rewriter
Expand Down
4 changes: 4 additions & 0 deletions arroyo-df/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub fn window_arrow_struct() -> DataType {
}

pub(crate) fn add_timestamp_field(schema: DFSchemaRef) -> DFResult<DFSchemaRef> {
if has_timestamp_field(schema.clone()) {
return Ok(schema);
}

let timestamp_field = DFField::new_unqualified(
"_timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
Expand Down
Loading

0 comments on commit 59b5879

Please sign in to comment.