Skip to content

Commit

Permalink
sql: Introduce updating queries. (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored Jun 15, 2023
1 parent ba55d2f commit 81e42e0
Show file tree
Hide file tree
Showing 26 changed files with 2,156 additions and 374 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Install Kafka
run: |
wget --progress=dot --show-progress https://downloads.apache.org/kafka/3.2.3/kafka_2.12-3.2.3.tgz
wget --progress=dot --show-progress https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar xvfz kafka*.tgz
mkdir /tmp/kraft-combined-logs
kafka_*/bin/kafka-storage.sh format -t 9v5PspiySuWU2l5NjTgRuA -c kafka_*/config/kraft/server.properties
Expand Down
2 changes: 1 addition & 1 deletion arroyo-api/src/optimizations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl FusedExpressionOperatorBuilder {
}

fn get_operator(self) -> Operator {
let name = format!("fused<{}>", self.names.join(","));
let name = format!("api_fused<{}>", self.names.join(","));
let fused_body_tokens = self.body;
let return_type = self.current_return_type.unwrap();
let return_value = match return_type {
Expand Down
1 change: 1 addition & 0 deletions arroyo-api/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub fn nexmark_schema() -> SourceSchema {
vec![
SchemaField::new("id", Primitive(Int64)),
SchemaField::new("description", Primitive(String)),
SchemaField::new("item_name", Primitive(String)),
SchemaField::new("initial_bid", Primitive(Int64)),
SchemaField::new("reserve", Primitive(Int64)),
SchemaField::new("datetime", Primitive(UnixMillis)),
Expand Down
65 changes: 59 additions & 6 deletions arroyo-controller/src/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::states::fatal;
use anyhow::{anyhow, Result};
use arroyo_datastream::{
AggregateBehavior, EdgeType, Operator, Program, SlidingAggregatingTopN,
AggregateBehavior, EdgeType, NonWindowAggregator, Operator, Program, SlidingAggregatingTopN,
SlidingWindowAggregator, TumblingTopN, TumblingWindowAggregator, WasmBehavior, WatermarkType,
WindowType,
};
Expand Down Expand Up @@ -654,7 +654,7 @@ wasm-opt = false
let in_t = parse_type(&input.unwrap().weight().value);
let out_k = parse_type(&output.unwrap().weight().key);
let out_t = parse_type(&output.unwrap().weight().value);
let func: syn::ExprClosure = parse_str(&quote!(|record, _| {#expr}).to_string()).unwrap();
let func: syn::ExprClosure = parse_quote!(|record, _| {#expr});
match return_type {
arroyo_datastream::ExpressionReturnType::Predicate => {
quote! {
Expand Down Expand Up @@ -839,7 +839,7 @@ wasm-opt = false
#max_elements))
}
}
Operator::JoinWithExpiration { left_expiration, right_expiration } => {
Operator::JoinWithExpiration { left_expiration, right_expiration, join_type } => {
let mut inputs: Vec<_> = program.graph.edges_directed(idx, Direction::Incoming)
.collect();
inputs.sort_by_key(|e| e.weight().typ.clone());
Expand All @@ -851,10 +851,63 @@ wasm-opt = false
let in_t2 = parse_type(&inputs[1].weight().value);
let left_expiration = duration_to_syn_expr(*left_expiration);
let right_expiration = duration_to_syn_expr(*right_expiration);
match join_type {
arroyo_types::JoinType::Inner => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
inner_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Left => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
left_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Right => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
right_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Full => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
full_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
}
},
Operator::UpdatingOperator { name, expression } => {
let expr : syn::Expr = parse_str(expression).expect(expression);
let in_k = parse_type(&input.unwrap().weight().key);
let in_t = parse_type(&input.unwrap().weight().value);
let out_t = parse_type(&output.unwrap().weight().value);
let func: syn::ExprClosure = parse_quote!(|arg| {
#expr});
quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
JoinWithExpiration::<#in_k, #in_t1, #in_t2>::
new(#left_expiration, #right_expiration))
Box::new(arroyo_worker::operators::OptionMapOperator::<#in_k, #in_t, #in_k, #out_t>::
updating_operator(#name.to_string(), #func))
}
},
Operator::NonWindowAggregator(NonWindowAggregator { expiration, aggregator, bin_merger, bin_type }) => {
let in_k = parse_type(&input.unwrap().weight().key);
let in_t = parse_type(&input.unwrap().weight().value);
let updating_out_t = parse_type(&output.unwrap().weight().value);
let out_t = extract_container_type("UpdatingData", &updating_out_t).unwrap();
let bin_t = parse_type(bin_type);
let expiration = duration_to_syn_expr(*expiration);
let aggregator: syn::ExprClosure = parse_str(aggregator).unwrap();
let bin_merger: syn::ExprClosure = parse_str(bin_merger).unwrap();
quote!{
Box::new(arroyo_worker::operators::updating_aggregate::
UpdatingAggregateOperator::<#in_k, #in_t, #bin_t, #out_t>::
new(#expiration,
#aggregator,
#bin_merger))
}
},
Operator::UpdatingKeyOperator { name, expression } => {
let updating_in_t = parse_type(&input.unwrap().weight().value);
let in_t = extract_container_type("UpdatingData", &updating_in_t).unwrap();
let out_k = parse_type(&output.unwrap().weight().key);
let expr : syn::Expr = parse_str(expression).expect(expression);
quote! {
Box::new(arroyo_worker::operators::
KeyMapUpdatingOperator::<#in_t, #out_k>::
new(#name.to_string(), #expr))
}
},
};
Expand Down
104 changes: 99 additions & 5 deletions arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use arroyo_rpc::grpc::api::{
self as GrpcApi, ExpressionAggregator, Flatten, KafkaAuthConfig, ProgramEdge,
};
use arroyo_types::nexmark::Event;
use arroyo_types::{from_micros, string_to_map, to_micros, Data, GlobalKey, ImpulseEvent, Key};
use arroyo_types::{
from_micros, string_to_map, to_micros, Data, GlobalKey, ImpulseEvent, JoinType, Key,
};
use bincode::{Decode, Encode};
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
Expand Down Expand Up @@ -236,7 +238,18 @@ pub struct SlidingAggregatingTopN {
pub max_elements: usize,
}

#[derive(Copy, Clone, Encode, Decode, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Encode, Decode, Serialize, Deserialize, PartialEq, Eq)]
pub struct NonWindowAggregator {
pub expiration: Duration,
// fn(&BinA) -> OutT
pub aggregator: String,
// fn(&T, Option<&BinA>) -> Option<BinA>
pub bin_merger: String,
// BinA
pub bin_type: String,
}

#[derive(Copy, Clone, Debug, Encode, Decode, Serialize, Deserialize, PartialEq)]
pub enum ImpulseSpec {
Delay(Duration),
EventsPerSecond(f32),
Expand All @@ -248,6 +261,7 @@ pub enum SerializationMode {
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
JsonSchemaRegistry,
RawJson,
DebeziumJson,
}
impl SerializationMode {
pub fn from_has_registry_flag(has_registry: bool) -> Self {
Expand All @@ -262,9 +276,14 @@ impl SerializationMode {
Some("json") => Self::Json,
Some("json_schema_registry") => Self::JsonSchemaRegistry,
Some("raw_json") => Self::RawJson,
Some("debezium_json") => Self::DebeziumJson,
_ => Self::Json,
}
}

pub fn is_updating(&self) -> bool {
matches!(self, Self::DebeziumJson)
}
}

impl ToTokens for SerializationMode {
Expand All @@ -279,6 +298,9 @@ impl ToTokens for SerializationMode {
SerializationMode::RawJson => {
quote::quote!(arroyo_worker::operators::SerializationMode::RawJson)
}
SerializationMode::DebeziumJson => {
quote::quote!(arroyo_worker::operators::SerializationMode::Json)
}
};

tokens.append_all(serialization_mode);
Expand Down Expand Up @@ -371,6 +393,16 @@ pub enum Operator {
JoinWithExpiration {
left_expiration: Duration,
right_expiration: Duration,
join_type: JoinType,
},
UpdatingOperator {
name: String,
expression: String,
},
NonWindowAggregator(NonWindowAggregator),
UpdatingKeyOperator {
name: String,
expression: String,
},
}

Expand Down Expand Up @@ -490,7 +522,7 @@ impl Debug for Operator {

match self {
Operator::FileSource { dir, .. } => write!(f, "FileSource<{:?}>", dir),
Operator::ImpulseSource { .. } => write!(f, "ImpulseSource"),
Operator::ImpulseSource { spec, .. } => write!(f, "ImpulseSource<{:?}>", spec),
Operator::KafkaSource { topic, .. } => write!(f, "KafkaSource<{}>", topic),
Operator::EventSourceSource { url, .. } => {
write!(f, "EventSource<{}>", url)
Expand Down Expand Up @@ -587,11 +619,21 @@ impl Debug for Operator {
Operator::JoinWithExpiration {
left_expiration,
right_expiration,
join_type,
} => write!(
f,
"JoinWithExpiration<left_expire: {:?}, right_expire: {:?}>",
left_expiration, right_expiration
"JoinWithExpiration<left_expire: {:?}, right_expire: {:?}, join_type: {:?}>",
left_expiration, right_expiration, join_type
),
Operator::UpdatingOperator {
name,
expression: _,
} => write!(f, "updating<{}>", name,),
Operator::NonWindowAggregator(_) => write!(f, "NonWindowAggregator"),
Operator::UpdatingKeyOperator {
name,
expression: _,
} => write!(f, "updating_key<{}>", name),
}
}
}
Expand Down Expand Up @@ -1817,10 +1859,35 @@ impl From<Operator> for GrpcApi::operator::Operator {
Operator::JoinWithExpiration {
left_expiration,
right_expiration,
join_type,
} => GrpcOperator::JoinWithExpiration(GrpcApi::JoinWithExpiration {
left_expiration_micros: left_expiration.as_micros() as u64,
right_expiration_micros: right_expiration.as_micros() as u64,
join_type: match join_type {
JoinType::Inner => GrpcApi::JoinType::Inner,
JoinType::Left => GrpcApi::JoinType::Left,
JoinType::Right => GrpcApi::JoinType::Right,
JoinType::Full => GrpcApi::JoinType::Full,
}
.into(),
}),
Operator::UpdatingOperator { name, expression } => {
GrpcOperator::UpdatingOperator(GrpcApi::UpdatingOperator { name, expression })
}
Operator::NonWindowAggregator(NonWindowAggregator {
expiration,
aggregator,
bin_merger,
bin_type,
}) => GrpcOperator::NonWindowAggregator(GrpcApi::NonWindowAggregator {
expiration_micros: expiration.as_micros() as u64,
aggregator,
bin_merger,
bin_type,
}),
Operator::UpdatingKeyOperator { name, expression } => {
GrpcOperator::UpdatingKeyOperator(GrpcApi::UpdatingKeyOperator { name, expression })
}
}
}
}
Expand All @@ -1840,6 +1907,7 @@ impl From<SerializationMode> for GrpcApi::SerializationMode {
SerializationMode::Json => GrpcApi::SerializationMode::Json,
SerializationMode::JsonSchemaRegistry => GrpcApi::SerializationMode::JsonSchemaRegistry,
SerializationMode::RawJson => GrpcApi::SerializationMode::Raw,
SerializationMode::DebeziumJson => GrpcApi::SerializationMode::Json,
}
}
}
Expand Down Expand Up @@ -2152,9 +2220,17 @@ impl TryFrom<arroyo_rpc::grpc::api::Operator> for Operator {
GrpcOperator::JoinWithExpiration(GrpcApi::JoinWithExpiration {
left_expiration_micros,
right_expiration_micros,
join_type,
}) => Operator::JoinWithExpiration {
left_expiration: Duration::from_micros(left_expiration_micros),
right_expiration: Duration::from_micros(right_expiration_micros),
join_type: match GrpcApi::JoinType::from_i32(join_type) {
Some(GrpcApi::JoinType::Inner) => JoinType::Inner,
Some(GrpcApi::JoinType::Left) => JoinType::Left,
Some(GrpcApi::JoinType::Right) => JoinType::Right,
Some(GrpcApi::JoinType::Full) => JoinType::Full,
None => JoinType::Inner,
},
},
GrpcOperator::ExpressionWatermark(GrpcApi::ExpressionWatermark {
period_micros,
Expand All @@ -2163,6 +2239,24 @@ impl TryFrom<arroyo_rpc::grpc::api::Operator> for Operator {
period: Duration::from_micros(period_micros),
expression,
}),
GrpcOperator::UpdatingOperator(GrpcApi::UpdatingOperator { name, expression }) => {
Operator::UpdatingOperator { name, expression }
}
GrpcOperator::NonWindowAggregator(GrpcApi::NonWindowAggregator {
expiration_micros,
aggregator,
bin_merger,
bin_type,
}) => Operator::NonWindowAggregator(NonWindowAggregator {
expiration: Duration::from_micros(expiration_micros),
aggregator,
bin_merger,
bin_type,
}),
GrpcOperator::UpdatingKeyOperator(GrpcApi::UpdatingKeyOperator {
name,
expression,
}) => Operator::UpdatingKeyOperator { name, expression },
},
None => bail!("unset on operator {:?}", operator),
};
Expand Down
2 changes: 1 addition & 1 deletion arroyo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ fn impl_stream_node_type(
#i => {
let message = match item {
crate::engine::QueueItem::Data(datum) => {
*datum.downcast().unwrap()
*datum.downcast().expect(&format!("failed to downcast data in {}", self.name()))
}
crate::engine::QueueItem::Bytes(bs) => {
ctx.counters
Expand Down
29 changes: 29 additions & 0 deletions arroyo-rpc/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ message Operator {
SlidingAggregatingTopN sliding_aggregating_top_n = 20;
JoinWithExpiration join_with_expiration = 21;
ExpressionWatermark expression_watermark = 23;
UpdatingOperator updating_operator = 24;
NonWindowAggregator non_window_aggregator = 25;
UpdatingKeyOperator updating_key_operator = 26;
}
}

Expand Down Expand Up @@ -355,14 +358,40 @@ message SlidingAggregatingTopN {
message JoinWithExpiration {
uint64 left_expiration_micros = 1;
uint64 right_expiration_micros = 2;
JoinType join_type = 3;
}

message UpdatingOperator {
string name = 1;
string expression = 2;
}

message NonWindowAggregator {
uint64 expiration_micros = 1;
string aggregator = 2;
string bin_merger = 3;
string bin_type = 4;
}

message UpdatingKeyOperator {
string name = 1;
string expression = 2;
}

enum ExpressionReturnType {
UNUSED_ERT = 0;
PREDICATE = 1;
RECORD = 2;
OPTIONAL_RECORD = 3;
}

enum JoinType {
INNER = 0;
LEFT = 1;
RIGHT = 2;
FULL = 3;
}

enum OffsetMode {
EARLIEST = 0;
LATEST = 1;
Expand Down
Loading

0 comments on commit 81e42e0

Please sign in to comment.