diff --git a/arroyo-datastream/src/logical.rs b/arroyo-datastream/src/logical.rs index 1161f890d..52a1d8f17 100644 --- a/arroyo-datastream/src/logical.rs +++ b/arroyo-datastream/src/logical.rs @@ -16,7 +16,7 @@ use strum::{Display, EnumString}; #[derive(Clone, Copy, Debug, Eq, PartialEq, EnumString, Display)] pub enum OperatorName { - Watermark, + ExpressionWatermark, ArrowValue, ArrowKey, ArrowAggregate, diff --git a/arroyo-df/src/lib.rs b/arroyo-df/src/lib.rs index ad4362880..553d5c40c 100644 --- a/arroyo-df/src/lib.rs +++ b/arroyo-df/src/lib.rs @@ -53,6 +53,7 @@ use std::collections::HashSet; use std::fmt::Debug; use crate::types::{interval_month_day_nanos_to_duration, rust_to_arrow, NullableType}; +use crate::watermark_node::WatermarkNode; use arroyo_datastream::logical::{LogicalEdge, LogicalEdgeType, LogicalProgram}; use arroyo_rpc::{ArroyoSchema, TIMESTAMP_FIELD}; use std::time::{Duration, SystemTime}; @@ -65,6 +66,7 @@ const DEFAULT_IDLE_TIME: Option = Some(Duration::from_secs(5 * 60)); #[cfg(test)] mod test; +mod watermark_node; #[allow(unused)] #[derive(Clone, Debug)] @@ -576,6 +578,7 @@ enum LogicalPlanExtension { name: String, connector_op: ConnectorOp, }, + WatermarkNode(WatermarkNode), } impl LogicalPlanExtension { @@ -590,6 +593,7 @@ impl LogicalPlanExtension { } => Some(inner_plan), LogicalPlanExtension::AggregateCalculation(_) => None, LogicalPlanExtension::Sink { .. } => None, + LogicalPlanExtension::WatermarkNode(n) => Some(&n.input), } } @@ -628,6 +632,9 @@ impl LogicalPlanExtension { DataFusionEdge::new(output_schema, LogicalEdgeType::Forward, vec![]).unwrap() } + LogicalPlanExtension::WatermarkNode(n) => { + DataFusionEdge::new(n.schema.clone(), LogicalEdgeType::Forward, vec![]).unwrap() + } LogicalPlanExtension::Sink { .. } => unreachable!(), } } @@ -986,6 +993,43 @@ impl TreeNodeRewriter for QueryToGraphVisitor { }; Ok(interred_plan.clone()) } + LogicalPlan::Extension(extension) => { + let Some(watermark_node) = extension.node.as_any().downcast_ref::() + else { + return Err(DataFusionError::Plan( + "Logical plan extension must be a watermark node".into(), + )); + }; + + let index = self + .local_logical_plan_graph + .add_node(LogicalPlanExtension::WatermarkNode(watermark_node.clone())); + + let table_name = format!("{}", index.index()); + + Ok(LogicalPlan::TableScan(TableScan { + table_name: OwnedTableReference::partial("arroyo-virtual", table_name.clone()), + source: create_table_with_timestamp( + OwnedTableReference::partial("arroyo-virtual", table_name).to_string(), + watermark_node + .schema + .fields() + .iter() + .map(|field| { + Arc::new(Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable(), + )) + }) + .collect(), + ), + projection: None, + projected_schema: watermark_node.schema.clone(), + filters: vec![], + fetch: None, + })) + } other => Ok(other), } } diff --git a/arroyo-df/src/plan_graph.rs b/arroyo-df/src/plan_graph.rs index 8ce930ffb..29fef65bf 100644 --- a/arroyo-df/src/plan_graph.rs +++ b/arroyo-df/src/plan_graph.rs @@ -19,7 +19,7 @@ use tracing::info; use crate::{ physical::{ArroyoMemExec, ArroyoPhysicalExtensionCodec, DecodingContext, EmptyRegistry}, schemas::add_timestamp_field_arrow, - AggregateCalculation, DataFusionEdge, QueryToGraphVisitor, + AggregateCalculation, QueryToGraphVisitor, }; use crate::{tables::Table, ArroyoSchemaProvider, CompiledSql}; use anyhow::{anyhow, bail, Context, Result}; @@ -36,6 +36,8 @@ use arroyo_rpc::{ }; use datafusion_common::{DFSchema, DFSchemaRef, ScalarValue}; use datafusion_expr::{expr::ScalarFunction, BuiltinScalarFunction, Expr, LogicalPlan}; +use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_proto::{ physical_plan::AsExecutionPlan, protobuf::{ @@ -112,33 +114,8 @@ impl Planner { parallelism: 1, }); - let watermark_index = program_graph.add_node(LogicalNode { - operator_id: format!("watermark_{}", program_graph.node_count()), - description: "watermark".to_string(), - operator_name: OperatorName::Watermark, - parallelism: 1, - operator_config: api::PeriodicWatermark { - period_micros: 1_000_000, - max_lateness_micros: 0, - idle_time_micros: None, - } - .encode_to_vec(), - }); - - let mut edge: LogicalEdge = (&DataFusionEdge::new( - table_scan.projected_schema.clone(), - LogicalEdgeType::Forward, - vec![], - ) - .unwrap()) - .into(); - - edge.projection = table_scan.projection.clone(); - - program_graph.add_edge(source_index, watermark_index, edge); - - node_mapping.insert(node_index, watermark_index); - watermark_index + node_mapping.insert(node_index, source_index); + source_index } crate::LogicalPlanExtension::ValueCalculation(logical_plan) => { let _inputs = logical_plan.inputs(); @@ -351,6 +328,73 @@ impl Planner { node_mapping.insert(node_index, sink_index); sink_index } + crate::LogicalPlanExtension::WatermarkNode(watermark_node) => { + let expression = create_physical_expr( + &watermark_node.watermark_expression.as_ref().unwrap(), + &watermark_node.schema, + &watermark_node.schema.as_ref().into(), + &ExecutionProps::new(), + )?; + + let expression = PhysicalExprNode::try_from(expression)?; + let schema = ArroyoSchema { + schema: Arc::new(watermark_node.schema.as_ref().into()), + timestamp_index: watermark_node.schema.fields().len() - 1, // TODO: is this correct? + key_indices: vec![], + }; + + let projection_execution_plan = self + .planner + .create_physical_plan(&watermark_node.input.clone(), &self.session_state) + .await + .context("creating physical plan for watermark input value calculation")?; + + let projection_physical_plan_node = PhysicalPlanNode::try_from_physical_plan( + projection_execution_plan, + &ArroyoPhysicalExtensionCodec::default(), + )?; + + let projection_node_config = ValuePlanOperator { + name: "proj".into(), + physical_plan: projection_physical_plan_node.encode_to_vec(), + }; + + let projection_node_index = program_graph.add_node(LogicalNode { + operator_id: format!("value_{}", program_graph.node_count()), + description: format!( + "pre_watermark_projection<{}>", + projection_node_config.name + ), + operator_name: OperatorName::ArrowValue, + operator_config: projection_node_config.encode_to_vec(), + parallelism: 1, + }); + + let watermark_index = program_graph.add_node(LogicalNode { + operator_id: format!("watermark_{}", program_graph.node_count()), + description: "watermark".to_string(), + operator_name: OperatorName::ExpressionWatermark, + parallelism: 1, + operator_config: api::ExpressionWatermarkConfig { + period_micros: 1_000_000, + idle_time_micros: None, + expression: expression.encode_to_vec(), + input_schema: Some(schema.clone().try_into()?), + } + .encode_to_vec(), + }); + + let edge = LogicalEdge { + edge_type: LogicalEdgeType::Forward, + schema, + projection: None, + }; + + program_graph.add_edge(projection_node_index, watermark_index, edge); + + node_mapping.insert(node_index, watermark_index); + projection_node_index + } }; for edge in rewriter diff --git a/arroyo-df/src/schemas.rs b/arroyo-df/src/schemas.rs index f1e7221ab..8efeca749 100644 --- a/arroyo-df/src/schemas.rs +++ b/arroyo-df/src/schemas.rs @@ -22,6 +22,10 @@ pub fn window_arrow_struct() -> DataType { } pub(crate) fn add_timestamp_field(schema: DFSchemaRef) -> DFResult { + if has_timestamp_field(schema.clone()) { + return Ok(schema); + } + let timestamp_field = DFField::new_unqualified( "_timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), diff --git a/arroyo-df/src/source_rewriter.rs b/arroyo-df/src/source_rewriter.rs index 3183739ce..a3ba94dac 100644 --- a/arroyo-df/src/source_rewriter.rs +++ b/arroyo-df/src/source_rewriter.rs @@ -1,39 +1,67 @@ +use crate::tables::ConnectorTable; use crate::tables::FieldSpec; -use crate::tables::Table::ConnectorTable; +use crate::tables::Table; +use crate::watermark_node::WatermarkNode; use crate::ArroyoSchemaProvider; use arrow_schema::Schema; use datafusion_common::tree_node::TreeNodeRewriter; -use datafusion_common::{Column, DFSchema, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, LogicalPlan, TableScan}; +use datafusion_common::{ + Column, DFSchema, DataFusionError, OwnedTableReference, Result as DFResult, ScalarValue, +}; +use datafusion_expr::{BinaryExpr, Expr, Extension, LogicalPlan, TableScan}; use std::sync::Arc; +use std::time::Duration; /// Rewrites a logical plan to move projections out of table scans -/// and into a separate projection node which may include virtual fields. +/// and into a separate projection node which may include virtual fields, +/// and adds a watermark node. pub struct SourceRewriter { pub(crate) schema_provider: ArroyoSchemaProvider, } -impl TreeNodeRewriter for SourceRewriter { - type N = LogicalPlan; - - fn mutate(&mut self, node: Self::N) -> DFResult { - let LogicalPlan::TableScan(table_scan) = node.clone() else { - return Ok(node); - }; - - let table_name = table_scan.table_name.table(); - let table = self - .schema_provider - .get_table(table_name) - .ok_or_else(|| DataFusionError::Plan(format!("Table {} not found", table_name)))?; - - let ConnectorTable(table) = table else { - return Ok(node); +impl SourceRewriter { + fn watermark_expression(table: &ConnectorTable) -> DFResult { + let expr = match table.watermark_field.clone() { + Some(watermark_field) => table + .fields + .iter() + .find_map(|f| { + if f.field().name() == &watermark_field { + return match f { + FieldSpec::StructField(f) => Some(Expr::Column(Column { + relation: None, + name: f.name().to_string(), + })), + FieldSpec::VirtualField { expression, .. } => Some(expression.clone()), + }; + } + None + }) + .ok_or_else(|| { + DataFusionError::Plan(format!("Watermark field {} not found", watermark_field)) + })?, + None => { + // If no watermark field is present, calculate it as a fixed lateness duration of 1 second + Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "_timestamp".to_string(), + })), + op: datafusion_expr::Operator::Minus, + right: Box::new(Expr::Literal(ScalarValue::DurationNanosecond(Some( + Duration::from_secs(1).as_nanos() as i64, + )))), + }) + } }; + Ok(expr) + } - let qualifier = table_scan.table_name.clone(); - - let expressions = table + fn projection_expressions( + table: &ConnectorTable, + qualifier: &OwnedTableReference, + ) -> DFResult> { + let mut expressions = table .fields .iter() .map(|field| match field { @@ -47,6 +75,38 @@ impl TreeNodeRewriter for SourceRewriter { }) .collect::>(); + // Add event time field if present + if let Some(event_time_field) = table.event_time_field.clone() { + let event_time_field = table + .fields + .iter() + .find_map(|f| { + if f.field().name() == &event_time_field { + return match f { + FieldSpec::StructField(f) => Some(Expr::Column(Column { + relation: Some(qualifier.clone()), + name: f.name().to_string(), + })), + FieldSpec::VirtualField { expression, .. } => Some(expression.clone()), + }; + } + None + }) + .ok_or_else(|| { + DataFusionError::Plan(format!( + "Event time field {} not found", + event_time_field + )) + })?; + + let event_time_field = event_time_field.alias("_timestamp".to_string()); + expressions.push(event_time_field); + }; + Ok(expressions) + } + + fn projection(&self, table_scan: &TableScan, table: &ConnectorTable) -> DFResult { + let qualifier = table_scan.table_name.clone(); let non_virtual_fields = table .fields .iter() @@ -63,7 +123,7 @@ impl TreeNodeRewriter for SourceRewriter { let table_scan_table_source = self .schema_provider - .get_table_source_with_fields(table_name, non_virtual_fields) + .get_table_source_with_fields(&table.name, non_virtual_fields) .unwrap(); let input_table_scan = LogicalPlan::TableScan(TableScan { @@ -76,7 +136,52 @@ impl TreeNodeRewriter for SourceRewriter { }); Ok(LogicalPlan::Projection( - datafusion_expr::Projection::try_new(expressions, Arc::new(input_table_scan))?, + datafusion_expr::Projection::try_new( + Self::projection_expressions(table, &qualifier)?, + Arc::new(input_table_scan), + )?, )) } } + +impl TreeNodeRewriter for SourceRewriter { + type N = LogicalPlan; + + fn mutate(&mut self, node: Self::N) -> DFResult { + let LogicalPlan::TableScan(table_scan) = node.clone() else { + return Ok(node); + }; + + let table_name = table_scan.table_name.table(); + let table = self + .schema_provider + .get_table(table_name) + .ok_or_else(|| DataFusionError::Plan(format!("Table {} not found", table_name)))?; + + let Table::ConnectorTable(table) = table else { + return Ok(node); + }; + + let watermark_schema = DFSchema::try_from_qualified_schema( + table_scan.table_name.clone(), + &Schema::new( + table + .fields + .iter() + .map(|field| field.field().clone()) + .collect::>() + .clone(), + ), + )?; + + let watermark_node = WatermarkNode { + input: self.projection(&table_scan, table)?, + watermark_expression: Some(Self::watermark_expression(table)?), + schema: Arc::new(watermark_schema), + }; + + return Ok(LogicalPlan::Extension(Extension { + node: Arc::new(watermark_node), + })); + } +} diff --git a/arroyo-df/src/tables.rs b/arroyo-df/src/tables.rs index 17a8f342c..9c013168e 100644 --- a/arroyo-df/src/tables.rs +++ b/arroyo-df/src/tables.rs @@ -64,7 +64,7 @@ impl FieldSpec { FieldSpec::VirtualField { .. } => true, } } - fn struct_field(&self) -> &Field { + pub fn field(&self) -> &Field { match self { FieldSpec::StructField(f) => f, FieldSpec::VirtualField { field, .. } => field, @@ -158,7 +158,7 @@ impl ConnectorTable { .iter() .filter(|f| !f.is_virtual()) .map(|f| { - let struct_field = f.struct_field(); + let struct_field = f.field(); struct_field.clone().try_into().map_err(|_| { anyhow!( "field '{}' has a type '{:?}' that cannot be used in a connection table", @@ -234,8 +234,8 @@ impl ConnectorTable { .fields .iter() .find(|f| { - f.struct_field().name() == field_name - && matches!(f.struct_field().data_type(), DataType::Timestamp(..)) + f.field().name() == field_name + && matches!(f.field().data_type(), DataType::Timestamp(..)) }) .ok_or_else(|| { anyhow!( @@ -244,9 +244,7 @@ impl ConnectorTable { ) })?; - Ok(Some(Expr::Column(Column::from_name( - field.struct_field().name(), - )))) + Ok(Some(Expr::Column(Column::from_name(field.field().name())))) } else { Ok(None) } @@ -259,8 +257,8 @@ impl ConnectorTable { .fields .iter() .find(|f| { - f.struct_field().name() == field_name - && matches!(f.struct_field().data_type(), DataType::Timestamp(..)) + f.field().name() == field_name + && matches!(f.field().data_type(), DataType::Timestamp(..)) }) .ok_or_else(|| { anyhow!( @@ -269,9 +267,7 @@ impl ConnectorTable { ) })?; - Ok(Some(Expr::Column(Column::from_name( - field.struct_field().name(), - )))) + Ok(Some(Expr::Column(Column::from_name(field.field().name())))) } else { Ok(None) } @@ -559,7 +555,7 @@ impl Table { name, fields: fields .into_iter() - .map(|f| Arc::new(f.struct_field().clone())) + .map(|f| Arc::new(f.field().clone())) .collect(), })) } @@ -658,7 +654,7 @@ impl Table { .unwrap_or_else(|| { fields .iter() - .map(|field| field.struct_field().clone().into()) + .map(|field| field.field().clone().into()) .collect() }), Table::TableFromQuery { logical_plan, .. } => logical_plan diff --git a/arroyo-df/src/watermark_node.rs b/arroyo-df/src/watermark_node.rs new file mode 100644 index 000000000..20b1c5d20 --- /dev/null +++ b/arroyo-df/src/watermark_node.rs @@ -0,0 +1,48 @@ +use crate::schemas::add_timestamp_field; +use datafusion_common::DFSchemaRef; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use std::fmt::Formatter; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct WatermarkNode { + pub input: LogicalPlan, + pub watermark_expression: Option, + pub schema: DFSchemaRef, +} + +impl UserDefinedLogicalNodeCore for WatermarkNode { + fn name(&self) -> &str { + "WatermarkNode" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + if let Some(expr) = &self.watermark_expression { + vec![expr.clone()] + } else { + vec![] + } + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "WaterMarkNode") + } + + fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + assert_eq!(inputs.len(), 1, "input size inconsistent"); + assert_eq!(exprs.len(), 1, "expression size inconsistent"); + + Self { + input: inputs[0].clone(), + watermark_expression: Some(exprs[0].clone()), + schema: add_timestamp_field(self.input.schema().clone()).unwrap(), + } + } +} diff --git a/arroyo-rpc/proto/api.proto b/arroyo-rpc/proto/api.proto index 11c62e7fc..40fe149e1 100644 --- a/arroyo-rpc/proto/api.proto +++ b/arroyo-rpc/proto/api.proto @@ -259,6 +259,13 @@ message ExpressionWatermark { optional uint64 idle_time_micros = 3; } +message ExpressionWatermarkConfig { + uint64 period_micros = 1; + optional uint64 idle_time_micros = 2; + ArroyoSchema input_schema = 3; + bytes expression = 4; +} + message ExpressionOperator { string name = 1; string expression= 2; diff --git a/arroyo-worker/src/engine.rs b/arroyo-worker/src/engine.rs index 1adf279dd..799092477 100644 --- a/arroyo-worker/src/engine.rs +++ b/arroyo-worker/src/engine.rs @@ -34,7 +34,6 @@ use crate::connectors::sse::SSESourceFunc; use crate::metrics::{register_queue_gauges, QueueGauges, TaskCounters}; use crate::network_manager::{NetworkManager, Quad, Senders}; use crate::operator::{server_for_hash_array, ArrowOperatorConstructor, OperatorNode}; -use crate::operators::PeriodicWatermarkGenerator; use crate::{RateLimiter, METRICS_PUSH_INTERVAL, PROMETHEUS_PUSH_GATEWAY}; use arroyo_datastream::logical::{ LogicalEdge, LogicalEdgeType, LogicalGraph, LogicalNode, OperatorName, @@ -57,6 +56,7 @@ use petgraph::visit::EdgeRef; use petgraph::Direction; use prometheus::labels; +use crate::operators::watermark_generator::WatermarkGenerator; use tokio::sync::mpsc::{channel, Receiver, Sender}; pub const QUEUE_SIZE: usize = 4 * 1024; @@ -1419,8 +1419,8 @@ impl Engine { pub fn construct_operator(operator: OperatorName, config: Vec) -> OperatorNode { let mut buf = config.as_slice(); match operator { - OperatorName::Watermark => { - PeriodicWatermarkGenerator::from_config(prost::Message::decode(&mut buf).unwrap()) + OperatorName::ExpressionWatermark => { + WatermarkGenerator::from_config(prost::Message::decode(&mut buf).unwrap()) } OperatorName::ArrowValue => { ValueExecutionOperator::from_config(prost::Message::decode(&mut buf).unwrap()) diff --git a/arroyo-worker/src/operators/mod.rs b/arroyo-worker/src/operators/mod.rs index 9c750aa63..19e88aee1 100644 --- a/arroyo-worker/src/operators/mod.rs +++ b/arroyo-worker/src/operators/mod.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::{fmt::Debug, path::PathBuf}; @@ -6,23 +5,15 @@ use std::{fmt::Debug, path::PathBuf}; use std::marker::PhantomData; use std::ops::Add; -use crate::engine::{ArrowContext, StreamNode}; +use crate::engine::StreamNode; use crate::old::{Collector, Context}; -use crate::operator::{get_timestamp_col, ArrowOperator, ArrowOperatorConstructor, OperatorNode}; -use arrow::compute::kernels; -use arrow_array::RecordBatch; use arroyo_macro::process_fn; -use arroyo_rpc::grpc::api::PeriodicWatermark; -use arroyo_rpc::grpc::{api, TableConfig}; -use arroyo_state::global_table_config; use arroyo_types::{ - from_millis, from_nanos, to_millis, ArrowMessage, CheckpointBarrier, Data, GlobalKey, Key, - Record, SignalMessage, TaskInfo, UpdatingData, Watermark, Window, + from_millis, to_millis, Data, GlobalKey, Key, Record, TaskInfo, UpdatingData, Window, }; -use async_trait::async_trait; -use bincode::{config, Decode, Encode}; +use bincode::config; use std::time::{Duration, SystemTime}; -use tracing::{debug, info}; +use tracing::debug; use wasmtime::{ Caller, Engine, InstanceAllocationStrategy, Linker, Module, PoolingAllocationConfig, Store, TypedFunc, @@ -39,6 +30,7 @@ pub mod sliding_top_n_aggregating_window; pub mod tumbling_aggregating_window; pub mod tumbling_top_n_window; pub mod updating_aggregate; +pub mod watermark_generator; pub mod windows; #[cfg(test)] @@ -106,160 +98,6 @@ mod test { } } -#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq)] -pub struct PeriodicWatermarkGeneratorState { - last_watermark_emitted_at: SystemTime, - max_watermark: SystemTime, -} - -pub struct PeriodicWatermarkGenerator { - interval: Duration, - max_lateness: Duration, - state_cache: PeriodicWatermarkGeneratorState, - idle_time: Option, - last_event: SystemTime, - idle: bool, -} - -impl PeriodicWatermarkGenerator { - pub fn fixed_lateness( - interval: Duration, - idle_time: Option, - max_lateness: Duration, - ) -> PeriodicWatermarkGenerator { - PeriodicWatermarkGenerator { - interval, - state_cache: PeriodicWatermarkGeneratorState { - last_watermark_emitted_at: SystemTime::UNIX_EPOCH, - max_watermark: SystemTime::UNIX_EPOCH, - }, - max_lateness, - idle_time, - last_event: SystemTime::now(), - idle: false, - } - } -} - -impl ArrowOperatorConstructor for PeriodicWatermarkGenerator { - fn from_config(config: PeriodicWatermark) -> anyhow::Result { - Ok(OperatorNode::from_operator(Box::new(Self::fixed_lateness( - Duration::from_micros(config.period_micros), - config.idle_time_micros.map(Duration::from_micros), - Duration::from_micros(config.max_lateness_micros), - )))) - } -} - -#[async_trait] -impl ArrowOperator for PeriodicWatermarkGenerator { - fn tables(&self) -> HashMap { - global_table_config("s", "periodic watermark state") - } - - fn name(&self) -> String { - "periodic_watermark_generator".to_string() - } - - fn tick_interval(&self) -> Option { - Some(Duration::from_secs(1)) - } - - async fn on_start(&mut self, ctx: &mut ArrowContext) { - let gs = ctx - .table_manager - .get_global_keyed_state("s") - .await - .expect("should have watermark table."); - self.last_event = SystemTime::now(); - - let state = - *(gs.get(&ctx.task_info.task_index) - .unwrap_or(&PeriodicWatermarkGeneratorState { - last_watermark_emitted_at: SystemTime::UNIX_EPOCH, - max_watermark: SystemTime::UNIX_EPOCH, - })); - - self.state_cache = state; - } - - async fn on_close(&mut self, final_message: &Option, ctx: &mut ArrowContext) { - if let Some(SignalMessage::EndOfData) = final_message { - // send final watermark on close - ctx.collector - .broadcast(ArrowMessage::Signal(SignalMessage::Watermark( - Watermark::EventTime(from_millis(u64::MAX)), - ))) - .await; - } - } - - async fn process_batch(&mut self, record: RecordBatch, ctx: &mut ArrowContext) { - ctx.collector.collect(record.clone()).await; - self.last_event = SystemTime::now(); - - let timestamp_column = get_timestamp_col(&record, ctx); - - let Some(min_timestamp) = kernels::aggregate::min(×tamp_column) else { - return; - }; - let min_timestamp = from_nanos(min_timestamp as u128); - let Some(max_timestamp) = kernels::aggregate::max(×tamp_column) else { - return; - }; - - let max_timestamp = from_nanos(max_timestamp as u128); - let watermark = min_timestamp - self.max_lateness; - - self.state_cache.max_watermark = self.state_cache.max_watermark.max(watermark); - if self.idle - || max_timestamp - .duration_since(self.state_cache.last_watermark_emitted_at) - .unwrap_or(Duration::ZERO) - > self.interval - { - debug!( - "[{}] Emitting watermark {}", - ctx.task_info.task_index, - to_millis(watermark) - ); - ctx.collector - .broadcast(ArrowMessage::Signal(SignalMessage::Watermark( - Watermark::EventTime(watermark), - ))) - .await; - self.state_cache.last_watermark_emitted_at = max_timestamp; - self.idle = false; - } - } - - async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut ArrowContext) { - let gs = ctx - .table_manager - .get_global_keyed_state("s") - .await - .expect("state"); - - gs.insert(ctx.task_info.task_index, self.state_cache).await; - } - - async fn handle_tick(&mut self, _: u64, ctx: &mut ArrowContext) { - if let Some(idle_time) = self.idle_time { - if self.last_event.elapsed().unwrap_or(Duration::ZERO) > idle_time && !self.idle { - info!( - "Setting partition {} to idle after {:?}", - ctx.task_info.task_index, idle_time - ); - ctx.broadcast(ArrowMessage::Signal(SignalMessage::Watermark( - Watermark::Idle, - ))) - .await; - self.idle = true; - } - } - } -} - pub trait TimeWindowAssigner: Copy + Clone + Send + 'static { fn windows(&self, ts: SystemTime) -> Vec; diff --git a/arroyo-worker/src/operators/watermark_generator.rs b/arroyo-worker/src/operators/watermark_generator.rs new file mode 100644 index 000000000..1bdc319d7 --- /dev/null +++ b/arroyo-worker/src/operators/watermark_generator.rs @@ -0,0 +1,188 @@ +use crate::engine::ArrowContext; +use crate::operator::{get_timestamp_col, ArrowOperator, ArrowOperatorConstructor, OperatorNode}; +use arrow::compute::kernels; +use arrow_array::RecordBatch; +use arroyo_df::physical::EmptyRegistry; +use arroyo_rpc::grpc::api::ExpressionWatermarkConfig; +use arroyo_rpc::grpc::TableConfig; +use arroyo_rpc::ArroyoSchema; +use arroyo_state::global_table_config; +use arroyo_types::{ + from_millis, from_nanos, to_millis, ArrowMessage, CheckpointBarrier, SignalMessage, Watermark, +}; +use async_trait::async_trait; +use bincode::{Decode, Encode}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_proto::physical_plan::from_proto::parse_physical_expr; +use datafusion_proto::protobuf::PhysicalExprNode; +use prost::Message; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tracing::{debug, info}; + +#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq)] +pub struct WatermarkGeneratorState { + last_watermark_emitted_at: SystemTime, + max_watermark: SystemTime, +} + +pub struct WatermarkGenerator { + interval: Duration, + state_cache: WatermarkGeneratorState, + idle_time: Option, + last_event: SystemTime, + idle: bool, + expression: Arc, +} + +impl WatermarkGenerator { + pub fn expression( + interval: Duration, + idle_time: Option, + expression: Arc, + ) -> WatermarkGenerator { + WatermarkGenerator { + interval, + state_cache: WatermarkGeneratorState { + last_watermark_emitted_at: SystemTime::UNIX_EPOCH, + max_watermark: SystemTime::UNIX_EPOCH, + }, + idle_time, + last_event: SystemTime::now(), + idle: false, + expression, + } + } +} + +impl ArrowOperatorConstructor for WatermarkGenerator { + fn from_config(config: ExpressionWatermarkConfig) -> anyhow::Result { + let input_schema: ArroyoSchema = config.input_schema.unwrap().try_into()?; + let expression = PhysicalExprNode::decode(&mut config.expression.as_slice())?; + let expression = parse_physical_expr(&expression, &EmptyRegistry {}, &input_schema.schema)?; + + Ok(OperatorNode::from_operator(Box::new(Self::expression( + Duration::from_micros(config.period_micros), + config.idle_time_micros.map(Duration::from_micros), + expression, + )))) + } +} + +#[async_trait] +impl ArrowOperator for WatermarkGenerator { + fn tables(&self) -> HashMap { + global_table_config("s", "expression watermark state") + } + + fn name(&self) -> String { + "expression_watermark_generator".to_string() + } + + fn tick_interval(&self) -> Option { + Some(Duration::from_secs(1)) + } + + async fn on_start(&mut self, ctx: &mut ArrowContext) { + let gs = ctx + .table_manager + .get_global_keyed_state("s") + .await + .expect("should have watermark table."); + self.last_event = SystemTime::now(); + + let state = *(gs + .get(&ctx.task_info.task_index) + .unwrap_or(&WatermarkGeneratorState { + last_watermark_emitted_at: SystemTime::UNIX_EPOCH, + max_watermark: SystemTime::UNIX_EPOCH, + })); + + self.state_cache = state; + } + + async fn on_close(&mut self, final_message: &Option, ctx: &mut ArrowContext) { + if let Some(SignalMessage::EndOfData) = final_message { + // send final watermark on close + ctx.collector + .broadcast(ArrowMessage::Signal(SignalMessage::Watermark( + Watermark::EventTime(from_millis(u64::MAX)), + ))) + .await; + } + } + + async fn process_batch(&mut self, record: RecordBatch, ctx: &mut ArrowContext) { + ctx.collector.collect(record.clone()).await; + self.last_event = SystemTime::now(); + + let timestamp_column = get_timestamp_col(&record, ctx); + let Some(max_timestamp) = kernels::aggregate::max(×tamp_column) else { + return; + }; + let max_timestamp = from_nanos(max_timestamp as u128); + + // calculate watermark using expression + let watermark = self + .expression + .evaluate(&record) + .unwrap() + .into_array(record.num_rows()) + .unwrap(); + + let watermark = watermark + .as_any() + .downcast_ref::() + .unwrap(); + + let watermark = from_nanos(kernels::aggregate::min(watermark).unwrap() as u128); + + self.state_cache.max_watermark = self.state_cache.max_watermark.max(watermark); + if self.idle + || max_timestamp + .duration_since(self.state_cache.last_watermark_emitted_at) + .unwrap_or(Duration::ZERO) + > self.interval + { + debug!( + "[{}] Emitting expression watermark {}", + ctx.task_info.task_index, + to_millis(watermark) + ); + ctx.collector + .broadcast(ArrowMessage::Signal(SignalMessage::Watermark( + Watermark::EventTime(watermark), + ))) + .await; + self.state_cache.last_watermark_emitted_at = max_timestamp; + self.idle = false; + } + } + + async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut ArrowContext) { + let gs = ctx + .table_manager + .get_global_keyed_state("s") + .await + .expect("state"); + + gs.insert(ctx.task_info.task_index, self.state_cache).await; + } + + async fn handle_tick(&mut self, _: u64, ctx: &mut ArrowContext) { + if let Some(idle_time) = self.idle_time { + if self.last_event.elapsed().unwrap_or(Duration::ZERO) > idle_time && !self.idle { + info!( + "Setting partition {} to idle after {:?}", + ctx.task_info.task_index, idle_time + ); + ctx.broadcast(ArrowMessage::Signal(SignalMessage::Watermark( + Watermark::Idle, + ))) + .await; + self.idle = true; + } + } + } +}