diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc798bc4c..ffb123b16 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,11 +76,17 @@ jobs: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - name: Install Kafka run: | - wget --progress=dot --show-progress https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz + wget --progress=dot --show-progress https://archive.apache.org/dist/kafka/3.5.0/kafka_2.12-3.5.0.tgz tar xvfz kafka*.tgz mkdir /tmp/kraft-combined-logs kafka_*/bin/kafka-storage.sh format -t 9v5PspiySuWU2l5NjTgRuA -c kafka_*/config/kraft/server.properties kafka_*/bin/kafka-server-start.sh -daemon kafka_*/config/kraft/server.properties + - name: Install mosquitto + run: | + sudo apt-get install -y mosquitto + sudo service mosquitto start + - name: Check Formatting + run: cargo fmt -- --check - name: Build run: cargo build --all-features - name: Validate API diff --git a/Cargo.lock b/Cargo.lock index 36c1ba1ad..c94e3f0c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,14 +587,19 @@ dependencies = [ "prost 0.12.3", "rand", "rdkafka", + "rdkafka-sys", "redis", "regex", "regress 0.7.1", "reqwest", + "rumqttc", + "rustls-native-certs 0.6.3", + "rustls-pemfile 1.0.4", "schemars", "serde", "serde_json", "tokio", + "tokio-rustls 0.24.1", "tokio-stream", "tokio-tungstenite", "tonic", @@ -726,7 +731,7 @@ dependencies = [ "test-log", "tokio", "tokio-stream", - "toml 0.8.10", + "toml 0.8.11", "tracing", "tracing-subscriber", "unicase", @@ -1131,7 +1136,7 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.2.0", "async-executor", - "async-io 2.3.1", + "async-io 2.3.2", "async-lock 3.3.0", "blocking", "futures-lite 2.2.0", @@ -1160,9 +1165,9 @@ dependencies = [ [[package]] name = "async-io" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" +checksum = "dcccb0f599cfa2f8ace422d3555572f47424da5648a4382a9dd0310ff8210884" dependencies = [ "async-lock 3.3.0", "cfg-if", @@ -1250,7 +1255,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" dependencies = [ - "async-io 2.3.1", + "async-io 2.3.2", "async-lock 2.8.0", "atomic-waker", "cfg-if", @@ -2972,8 +2977,8 @@ dependencies = [ "datafusion-common 36.0.0", "paste", "sqlparser 0.43.1", - "strum 0.26.1", - "strum_macros 0.26.1", + "strum 0.26.2", + "strum_macros 0.26.2", ] [[package]] @@ -3862,6 +3867,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fluvio" version = "0.21.7" @@ -3897,7 +3913,7 @@ dependencies = [ "siphasher 1.0.0", "thiserror", "tokio", - "toml 0.8.10", + "toml 0.8.11", "tracing", ] @@ -3940,7 +3956,7 @@ dependencies = [ "serde", "serde_yaml", "thiserror", - "toml 0.8.10", + "toml 0.8.11", "tracing", ] @@ -4626,12 +4642,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", - "futures-util", + "futures-core", "http 1.1.0", "http-body 1.0.0", "pin-project-lite", @@ -6958,6 +6974,7 @@ dependencies = [ "serde_json", "slab", "tokio", + "tracing", ] [[package]] @@ -7062,7 +7079,7 @@ dependencies = [ "time", "tokio", "tokio-postgres", - "toml 0.8.10", + "toml 0.8.11", "url", "walkdir", ] @@ -7171,9 +7188,9 @@ checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" [[package]] name = "reqwest" -version = "0.11.24" +version = "0.11.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" +checksum = "0eea5a9eb898d3783f17c6407670e3592fd174cb81a10e51d4c37f49450b9946" dependencies = [ "base64 0.21.7", "bytes", @@ -7328,6 +7345,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rumqttc" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs 0.6.3", + "rustls-pemfile 1.0.4", + "rustls-webpki", + "thiserror", + "tokio", + "tokio-rustls 0.24.1", + "url", +] + [[package]] name = "rusoto_core" version = "0.47.0" @@ -7957,9 +7993,9 @@ dependencies = [ [[package]] name = "serde_path_to_error" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" dependencies = [ "itoa", "serde", @@ -8254,6 +8290,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spinning_top" @@ -8380,11 +8419,11 @@ dependencies = [ [[package]] name = "strum" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" dependencies = [ - "strum_macros 0.26.1", + "strum_macros 0.26.2", ] [[package]] @@ -8402,9 +8441,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" dependencies = [ "heck", "proc-macro2", @@ -8500,20 +8539,20 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "system-configuration" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.2", "core-foundation", "system-configuration-sys", ] [[package]] name = "system-configuration-sys" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" dependencies = [ "core-foundation-sys", "libc", @@ -8911,15 +8950,15 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" +checksum = "af06656561d28735e9c1cd63dfd57132c8155426aa6af24f36a00a351f88c48e" dependencies = [ "indexmap 2.2.5", "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.6", + "toml_edit 0.22.7", ] [[package]] @@ -8946,9 +8985,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.6" +version = "0.22.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6" +checksum = "18769cd1cec395d70860ceb4d932812a0b4d06b1a4bb336745a4d21b9496e992" dependencies = [ "indexmap 2.2.5", "serde", @@ -9476,9 +9515,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-bag" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126e423afe2dd9ac52142e7e9d5ce4135d7e13776c529d27fd6bc49f19e3280b" +checksum = "8fec26a25bd6fca441cdd0f769fd7f891bae119f996de31f86a5eddccef54c1d" [[package]] name = "vcpkg" @@ -9981,9 +10020,9 @@ dependencies = [ [[package]] name = "whoami" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fec781d48b41f8163426ed18e8fc2864c12937df9ce54c88ede7bd47270893e" +checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ "redox_syscall 0.4.1", "wasite", diff --git a/crates/arroyo-api/src/connection_tables.rs b/crates/arroyo-api/src/connection_tables.rs index 7a3242139..afe29cfd2 100644 --- a/crates/arroyo-api/src/connection_tables.rs +++ b/crates/arroyo-api/src/connection_tables.rs @@ -632,14 +632,18 @@ async fn get_schema( )); }; - let resolver = - ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key.clone(), api_secret.clone()) - .map_err(|e| { - bad_request(format!( - "failed to fetch schemas from schema repository: {}", - e - )) - })?; + let resolver = ConfluentSchemaRegistry::new( + &endpoint, + &table.subject(), + api_key.clone(), + api_secret.clone(), + ) + .map_err(|e| { + bad_request(format!( + "failed to fetch schemas from schema repository: {}", + e + )) + })?; resolver.get_schema_for_version(None).await.map_err(|e| { bad_request(format!( diff --git a/crates/arroyo-api/src/pipelines.rs b/crates/arroyo-api/src/pipelines.rs index c823f816a..066f10577 100644 --- a/crates/arroyo-api/src/pipelines.rs +++ b/crates/arroyo-api/src/pipelines.rs @@ -207,7 +207,7 @@ async fn try_register_confluent_schema( }; let schema_registry = - ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key, api_secret)?; + ConfluentSchemaRegistry::new(&endpoint, &table.subject(), api_key, api_secret)?; match config.format.clone() { Some(Format::Avro(mut avro)) => { diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index b44d77828..c74ddb064 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -48,7 +48,8 @@ regex = "1" ########################## # Kafka -rdkafka = { version = "0.33", features = ["cmake-build"] } +rdkafka = { version = "0.33", features = ["cmake-build", "tracing"] } +rdkafka-sys = "4.5.0" # SSE eventsource-client = "0.12.0" @@ -77,5 +78,11 @@ object_store = { workspace = true } deltalake = {version = "0.17", features = ["s3", "datafusion"] } async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] } +# MQTT +rumqttc = { version = "0.23.0", features = ["url"] } +rustls-native-certs = "0.6" +rustls-pemfile = "1" +tokio-rustls = "0.24" + [build-dependencies] -glob = "0.3" \ No newline at end of file +glob = "0.3" diff --git a/crates/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs index 6c7887302..c694b9ef7 100644 --- a/crates/arroyo-connectors/src/confluent/mod.rs +++ b/crates/arroyo-connectors/src/confluent/mod.rs @@ -4,10 +4,12 @@ use crate::kafka::{ use crate::{kafka, pull_opt}; use anyhow::anyhow; use arroyo_operator::connector::{Connection, Connector}; +use arroyo_operator::operator::OperatorNode; use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, }; use arroyo_rpc::var_str::VarStr; +use arroyo_rpc::OperatorConfig; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::mpsc::Sender; @@ -186,4 +188,13 @@ impl Connector for ConfluentConnector { .insert("client.id".to_string(), CLIENT_ID.to_string()); KafkaConnector {}.from_config(id, name, config.into(), table, schema) } + + fn make_operator( + &self, + profile: Self::ProfileT, + table: Self::TableT, + config: OperatorConfig, + ) -> anyhow::Result { + KafkaConnector {}.make_operator(profile.into(), table, config) + } } diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index c704a8818..3341eda13 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -9,6 +9,7 @@ use arroyo_rpc::schema_resolver::{ ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, FailingSchemaResolver, SchemaResolver, }; use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig}; +use arroyo_types::string_to_map; use futures::TryFutureExt; use rdkafka::{ consumer::{BaseConsumer, Consumer}, @@ -16,6 +17,7 @@ use rdkafka::{ }; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::borrow::Cow; use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::Arc; @@ -49,6 +51,15 @@ import_types!( ); import_types!(schema = "src/kafka/table.json"); +impl KafkaTable { + pub fn subject(&self) -> Cow { + match &self.value_subject { + None => Cow::Owned(format!("{}-value", self.topic)), + Some(s) => Cow::Borrowed(s), + } + } +} + pub struct KafkaConnector {} impl KafkaConnector { @@ -127,7 +138,16 @@ impl KafkaConnector { Ok(KafkaTable { topic: pull_opt("topic", options)?, type_: table_type, - client_configs: HashMap::new(), + client_configs: options + .remove("client_configs") + .map(|c| { + string_to_map(&c, '=').ok_or_else(|| { + anyhow!("invalid client_config: expected comma and equals-separated pairs") + }) + }) + .transpose()? + .unwrap_or_else(|| HashMap::new()), + value_subject: options.remove("value.subject"), }) } } @@ -334,7 +354,7 @@ impl Connector for KafkaConnector { Arc::new( ConfluentSchemaRegistry::new( &endpoint, - &table.topic, + &table.subject(), api_key.clone(), api_secret.clone(), ) @@ -389,7 +409,7 @@ pub struct TopicMetadata { } impl KafkaTester { - async fn connect(&self) -> Result { + async fn connect(&self, table: Option) -> Result { let mut client_config = ClientConfig::new(); client_config .set( @@ -422,6 +442,12 @@ impl KafkaTester { } }; + if let Some(table) = table { + for (k, v) in table.client_configs { + client_config.set(k, v); + } + } + let client: BaseConsumer = client_config .create() .map_err(|e| format!("invalid kafka config: {:?}", e))?; @@ -439,7 +465,10 @@ impl KafkaTester { #[allow(unused)] pub async fn topic_metadata(&self, topic: &str) -> Result { - let client = self.connect().await.map_err(Status::failed_precondition)?; + let client = self + .connect(None) + .await + .map_err(Status::failed_precondition)?; let topic = topic.to_string(); tokio::task::spawn_blocking(move || { @@ -476,7 +505,7 @@ impl KafkaTester { } async fn fetch_topics(&self) -> anyhow::Result> { - let client = self.connect().await.map_err(|e| anyhow!("{}", e))?; + let client = self.connect(None).await.map_err(|e| anyhow!("{}", e))?; tokio::task::spawn_blocking(move || { let metadata = client @@ -548,7 +577,7 @@ impl KafkaTester { api_secret, }) => schema_resolver::ConfluentSchemaRegistry::new( endpoint, - &table.topic, + &table.subject(), api_key.clone(), api_secret.clone(), ), @@ -637,7 +666,10 @@ impl KafkaTester { .and_then(|s| s.format.clone()) .ok_or_else(|| anyhow!("No format defined for Kafka connection"))?; - let client = self.connect().await.map_err(|e| anyhow!("{}", e))?; + let client = self + .connect(Some(table.clone())) + .await + .map_err(|e| anyhow!("{}", e))?; self.info(&mut tx, "Connected to Kafka").await; @@ -747,7 +779,7 @@ impl KafkaTester { #[allow(unused)] pub async fn test_connection(&self) -> TestSourceMessage { - match self.connect().await { + match self.connect(None).await { Ok(_) => TestSourceMessage { error: false, done: true, @@ -829,15 +861,15 @@ pub fn client_configs(connection: &KafkaConfig, table: &KafkaTable) -> HashMap() .unwrap(); - println!("A = {:?}", a); - for v in a { assert_eq!( expected_values diff --git a/crates/arroyo-connectors/src/kafka/table.json b/crates/arroyo-connectors/src/kafka/table.json index 87e4a7c9e..dcbba81f2 100644 --- a/crates/arroyo-connectors/src/kafka/table.json +++ b/crates/arroyo-connectors/src/kafka/table.json @@ -72,6 +72,11 @@ "additionalProperties": { "type": "string" } + }, + "value_subject": { + "type": "string", + "title": "Schema Registry value subject", + "description": "Set this to use a non-standard subject for this topic in Confluent Schema Registry (defaults to `{TOPIC}_value`)" } }, "required": [ diff --git a/crates/arroyo-connectors/src/lib.rs b/crates/arroyo-connectors/src/lib.rs index 4920564d6..b9c1a1751 100644 --- a/crates/arroyo-connectors/src/lib.rs +++ b/crates/arroyo-connectors/src/lib.rs @@ -1,4 +1,13 @@ +use crate::confluent::ConfluentConnector; +use crate::filesystem::delta::DeltaLakeConnector; +use crate::filesystem::FileSystemConnector; +use crate::kinesis::KinesisConnector; +use crate::mqtt::MqttConnector; +use crate::polling_http::PollingHTTPConnector; use crate::preview::PreviewConnector; +use crate::redis::RedisConnector; +use crate::single_file::SingleFileConnector; +use crate::webhook::WebhookConnector; use anyhow::{anyhow, bail, Context}; use arroyo_operator::connector::ErasedConnector; use arroyo_rpc::api_types::connections::{ @@ -30,6 +39,7 @@ pub mod fluvio; pub mod impulse; pub mod kafka; pub mod kinesis; +pub mod mqtt; pub mod nexmark; pub mod polling_http; pub mod preview; @@ -42,20 +52,21 @@ pub mod websocket; pub fn connectors() -> HashMap<&'static str, Box> { let connectors: Vec> = vec![ Box::new(BlackholeConnector {}), - Box::new(confluent::ConfluentConnector {}), - Box::new(filesystem::delta::DeltaLakeConnector {}), - Box::new(filesystem::FileSystemConnector {}), + Box::new(ConfluentConnector {}), + Box::new(DeltaLakeConnector {}), + Box::new(FileSystemConnector {}), Box::new(FluvioConnector {}), Box::new(ImpulseConnector {}), Box::new(KafkaConnector {}), - Box::new(kinesis::KinesisConnector {}), + Box::new(KinesisConnector {}), + Box::new(MqttConnector {}), Box::new(NexmarkConnector {}), - Box::new(polling_http::PollingHTTPConnector {}), + Box::new(PollingHTTPConnector {}), Box::new(PreviewConnector {}), - Box::new(redis::RedisConnector {}), - Box::new(single_file::SingleFileConnector {}), + Box::new(RedisConnector {}), + Box::new(SingleFileConnector {}), Box::new(SSEConnector {}), - Box::new(webhook::WebhookConnector {}), + Box::new(WebhookConnector {}), Box::new(WebsocketConnector {}), ]; @@ -129,7 +140,7 @@ fn construct_http_client(endpoint: &str, headers: Option) -> anyhow::Res }; let headers: anyhow::Result = - string_to_map(headers.as_ref().map(|t| t.as_str()).unwrap_or("")) + string_to_map(headers.as_ref().map(|t| t.as_str()).unwrap_or(""), ':') .expect("Invalid header map") .into_iter() .map(|(k, v)| { @@ -156,6 +167,7 @@ pub fn header_map(headers: Option) -> HashMap { &headers .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) .unwrap_or("".to_string()), + ':', ) .expect("Invalid header map") } diff --git a/crates/arroyo-connectors/src/mqtt/mod.rs b/crates/arroyo-connectors/src/mqtt/mod.rs new file mode 100644 index 000000000..8934a96ae --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/mod.rs @@ -0,0 +1,472 @@ +use std::collections::HashMap; +use std::num::NonZeroU32; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::time::Duration; + +use crate::mqtt::sink::MqttSinkFunc; +use crate::mqtt::source::MqttSourceFunc; +use crate::pull_opt; +use anyhow::{anyhow, bail}; +use arroyo_formats::ser::ArrowSerializer; +use arroyo_operator::connector::{Connection, Connector}; +use arroyo_operator::operator::OperatorNode; +use arroyo_rpc::api_types::connections::{ + ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, +}; +use arroyo_rpc::{var_str::VarStr, OperatorConfig}; +use rumqttc::v5::mqttbytes::QoS; +use rumqttc::v5::{AsyncClient, Event as MqttEvent, EventLoop, Incoming, MqttOptions}; +use rumqttc::Outgoing; +use rustls_native_certs::load_native_certs; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot::Receiver; +use tokio_rustls::rustls::{Certificate, ClientConfig, PrivateKey, RootCertStore}; +use typify::import_types; + +const CONFIG_SCHEMA: &str = include_str!("./profile.json"); +const TABLE_SCHEMA: &str = include_str!("./table.json"); +const ICON: &str = include_str!("./mqtt.svg"); + +pub mod sink; +pub mod source; + +import_types!( + schema = "src/mqtt/profile.json", + convert = { + {type = "string", format = "var-str"} = VarStr + } +); +import_types!(schema = "src/mqtt/table.json"); +pub struct MqttConnector {} + +impl MqttTable { + pub fn qos(&self) -> QoS { + self.qos + .and_then(|qos| match qos { + QualityOfService::AtMostOnce => Some(QoS::AtMostOnce), + QualityOfService::AtLeastOnce => Some(QoS::AtLeastOnce), + QualityOfService::ExactlyOnce => Some(QoS::ExactlyOnce), + }) + .unwrap_or(QoS::AtMostOnce) + } +} + +impl MqttConnector { + pub fn connection_from_options( + options: &mut HashMap, + ) -> anyhow::Result { + let url = match options.remove("url") { + Some(host) => host, + None => bail!("url is required for mqtt connection"), + }; + let username = options.remove("username").map(VarStr::new); + let password = options.remove("password").map(VarStr::new); + + let ca = options.remove("tls.ca").map(VarStr::new); + let cert = options.remove("tls.cert").map(VarStr::new); + let key = options.remove("tls.key").map(VarStr::new); + + let parsed_url = url::Url::parse(&url)?; + + let tls = if matches!(parsed_url.scheme(), "mqtts" | "ssl") { + Some(Tls { ca, cert, key }) + } else { + None + }; + + Ok(MqttConfig { + url, + username, + password, + tls, + client_prefix: options.remove("client_prefix"), + }) + } + + pub fn table_from_options(options: &mut HashMap) -> anyhow::Result { + let typ = pull_opt("type", options)?; + let qos = options + .remove("qos") + .map(|s| { + QualityOfService::try_from(s).map_err(|s| anyhow!("invalid value for 'qos': {s}")) + }) + .transpose()?; + + let table_type = match typ.as_str() { + "source" => TableType::Source {}, + "sink" => TableType::Sink { + retain: options + .remove("sink.retain") + .map(|s| { + s.parse::() + .map_err(|_| anyhow!("'sink.retail' must be either 'true' or 'false'")) + }) + .transpose()? + .unwrap_or(false), + }, + _ => { + bail!("type must be one of 'source' or 'sink") + } + }; + + Ok(MqttTable { + topic: pull_opt("topic", options)?, + type_: table_type, + qos, + }) + } +} + +impl Connector for MqttConnector { + type ProfileT = MqttConfig; + type TableT = MqttTable; + + fn name(&self) -> &'static str { + "mqtt" + } + + fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector { + arroyo_rpc::api_types::connections::Connector { + id: "mqtt".to_string(), + name: "Mqtt".to_string(), + icon: ICON.to_string(), + description: "Read and write from a mqtt cluster".to_string(), + enabled: true, + source: true, + sink: true, + testing: true, + hidden: false, + custom_schemas: true, + connection_config: Some(CONFIG_SCHEMA.to_string()), + table_config: TABLE_SCHEMA.to_string(), + } + } + + fn config_description(&self, config: Self::ProfileT) -> String { + config.url.clone() + } + + fn from_config( + &self, + id: Option, + name: &str, + config: MqttConfig, + table: MqttTable, + schema: Option<&ConnectionSchema>, + ) -> anyhow::Result { + let (typ, desc) = match table.type_ { + TableType::Source { .. } => ( + ConnectionType::Source, + format!("MqttSource<{}>", table.topic), + ), + TableType::Sink { .. } => (ConnectionType::Sink, format!("MqttSink<{}>", table.topic)), + }; + + let schema = schema + .map(|s| s.to_owned()) + .ok_or_else(|| anyhow!("No schema defined for Mqtt connection"))?; + + let format = schema + .format + .as_ref() + .map(|t| t.to_owned()) + .ok_or_else(|| anyhow!("'format' must be set for Mqtt connection"))?; + + let config = OperatorConfig { + connection: serde_json::to_value(config).unwrap(), + table: serde_json::to_value(table).unwrap(), + rate_limit: None, + format: Some(format), + bad_data: schema.bad_data.clone(), + framing: schema.framing.clone(), + }; + + Ok(Connection { + id, + connector: self.name(), + name: name.to_string(), + connection_type: typ, + schema, + config: serde_json::to_string(&config).unwrap(), + description: desc, + }) + } + + fn test_profile(&self, profile: Self::ProfileT) -> Option> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + let (itx, _rx) = tokio::sync::mpsc::channel(8); + let message = match test_inner(profile, None, itx).await { + Ok(_) => TestSourceMessage::done("Successfully connected to Mqtt"), + Err(e) => TestSourceMessage::fail(format!("Failed to connect to Mqtt: {:?}", e)), + }; + + tx.send(message).unwrap(); + }); + + Some(rx) + } + + fn test( + &self, + _: &str, + config: Self::ProfileT, + table: Self::TableT, + _schema: Option<&ConnectionSchema>, + tx: Sender, + ) { + tokio::task::spawn(async move { + let resp = match test_inner(config, Some(table), tx.clone()).await { + Ok(c) => TestSourceMessage::done(c), + Err(e) => TestSourceMessage::fail(e.to_string()), + }; + + tx.send(resp).await.unwrap(); + }); + } + + fn table_type(&self, _: Self::ProfileT, table: Self::TableT) -> ConnectionType { + match table.type_ { + TableType::Source { .. } => ConnectionType::Source, + TableType::Sink { .. } => ConnectionType::Sink, + } + } + + fn from_options( + &self, + name: &str, + options: &mut HashMap, + schema: Option<&ConnectionSchema>, + profile: Option<&ConnectionProfile>, + ) -> anyhow::Result { + let connection = profile + .map(|p| { + serde_json::from_value(p.config.clone()).map_err(|e| { + anyhow!("invalid config for profile '{}' in database: {}", p.id, e) + }) + }) + .unwrap_or_else(|| Self::connection_from_options(options))?; + + let table = Self::table_from_options(options)?; + + Self::from_config(&self, None, name, connection, table, schema) + } + + fn make_operator( + &self, + profile: Self::ProfileT, + table: Self::TableT, + config: OperatorConfig, + ) -> anyhow::Result { + let qos = table.qos(); + Ok(match table.type_ { + TableType::Source {} => OperatorNode::from_source(Box::new(MqttSourceFunc { + config: profile, + topic: table.topic, + qos, + format: config + .format + .ok_or_else(|| anyhow!("format is required for mqtt source"))?, + framing: config.framing, + bad_data: config.bad_data, + messages_per_second: NonZeroU32::new( + config + .rate_limit + .map(|l| l.messages_per_second) + .unwrap_or(u32::MAX), + ) + .unwrap(), + subscribed: Arc::new(AtomicBool::new(false)), + })), + TableType::Sink { retain } => OperatorNode::from_operator(Box::new(MqttSinkFunc { + config: profile, + qos, + topic: table.topic, + retain, + serializer: ArrowSerializer::new( + config + .format + .ok_or_else(|| anyhow!("format is required for mqtt sink"))?, + ), + stopped: Arc::new(AtomicBool::new(false)), + client: None, + })), + }) + } +} + +async fn test_inner( + c: MqttConfig, + t: Option, + tx: Sender, +) -> anyhow::Result { + tx.send(TestSourceMessage::info("Connecting to Mqtt")) + .await + .unwrap(); + + let (client, mut eventloop) = create_connection(&c, 0)?; + + let wait_for_incomming = match t { + Some(t) => { + let topic = t.topic; + let qos = t + .qos + .and_then(|qos| match qos { + QualityOfService::AtMostOnce => Some(QoS::AtMostOnce), + QualityOfService::AtLeastOnce => Some(QoS::AtLeastOnce), + QualityOfService::ExactlyOnce => Some(QoS::ExactlyOnce), + }) + .unwrap_or(QoS::AtMostOnce); + if let TableType::Sink { retain, .. } = t.type_ { + client + .publish(topic, qos, retain, "test".as_bytes()) + .await?; + false + } else { + client.subscribe(&topic, qos).await?; + client.publish(topic, qos, false, "test".as_bytes()).await?; + true + } + } + None => { + client + .publish("test-arroyo", QoS::AtMostOnce, false, "test".as_bytes()) + .await?; + false + } + }; + + loop { + match eventloop.poll().await { + Ok(notification) => match notification { + MqttEvent::Incoming(Incoming::Publish(p)) => { + let _payload = String::from_utf8(p.payload.to_vec())?; + return Ok("Successfully subscribed".to_string()); + } + MqttEvent::Outgoing(Outgoing::Publish(_p)) => { + if !wait_for_incomming { + return Ok("Successfully published".to_string()); + } + } + MqttEvent::Incoming(Incoming::Disconnect { .. }) + | MqttEvent::Outgoing(Outgoing::Disconnect) => { + bail!("Disconnected from Mqtt"); + } + _ => (), + }, + Err(e) => bail!("Error while reading from Mqtt: {:?}", e), + } + } +} + +fn load_certs(certificates: &str) -> anyhow::Result> { + let cert_bytes = std::fs::read_to_string(certificates).map_or_else( + |_| certificates.as_bytes().to_owned(), + |certs| certs.as_bytes().to_owned(), + ); + + let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice()).map_err(|err| anyhow!(err))?; + + Ok(certs.into_iter().map(Certificate).collect()) +} + +fn load_private_key(certificate: &str) -> anyhow::Result { + let cert_bytes = std::fs::read_to_string(certificate).map_or_else( + |_| certificate.as_bytes().to_owned(), + |cert| cert.as_bytes().to_owned(), + ); + + let certs = rustls_pemfile::pkcs8_private_keys(&mut cert_bytes.as_slice()) + .map_err(|err| anyhow!(err))?; + let cert = certs + .into_iter() + .next() + .ok_or_else(|| anyhow!("No private key found"))?; + Ok(PrivateKey(cert)) +} + +pub(crate) fn create_connection( + c: &MqttConfig, + task_id: usize, +) -> anyhow::Result<(AsyncClient, EventLoop)> { + // It creates a client id with the format: _ + // because the client id must be unique for each connection. Otherwise, the broker will only keep one active connection + // per client id + let client_id = format!( + "{}_{}{}", + c.client_prefix + .as_ref() + .map(|s| s.as_str()) + .unwrap_or_else(|| "arroyo-mqtt"), + task_id, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + % 100000, + ); + + let mut url = url::Url::parse(&c.url)?; + let ssl = matches!(url.scheme(), "mqtts" | "ssl"); + url.query_pairs_mut().append_pair("client_id", &client_id); + + let mut options = MqttOptions::try_from(url)?; + + options.set_keep_alive(Duration::from_secs(10)); + if ssl { + let mut root_cert_store = RootCertStore::empty(); + + if let Some(ca) = c.tls.as_ref().and_then(|tls| tls.ca.as_ref()) { + let ca = ca.sub_env_vars().map_err(|e| anyhow!("{}", e))?; + let certificates = load_certs(&ca)?; + for cert in certificates { + root_cert_store.add(&cert).unwrap(); + } + } else { + for cert in load_native_certs().expect("could not load platform certs") { + root_cert_store.add(&Certificate(cert.0)).unwrap(); + } + } + + let builder = ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_cert_store); + + let tls_config = if let Some((Some(client_cert), Some(client_key))) = c + .tls + .as_ref() + .and_then(|tls| Some((tls.cert.as_ref(), tls.key.as_ref()))) + { + let client_cert = client_cert.sub_env_vars().map_err(|e| anyhow!("{}", e))?; + let client_key = client_key.sub_env_vars().map_err(|e| anyhow!("{}", e))?; + let certs = load_certs(&client_cert)?; + let key = load_private_key(&client_key)?; + + builder.with_client_auth_cert(certs, key)? + } else { + builder.with_no_client_auth() + }; + + options.set_transport(rumqttc::Transport::tls_with_config( + rumqttc::TlsConfiguration::Rustls(Arc::new(tls_config)), + )); + } + + let password = if let Some(password) = &c.password { + password.sub_env_vars().map_err(|e| anyhow!("{}", e))? + } else { + "".to_string() + }; + + if let Some(username) = &c.username { + options.set_credentials( + username.sub_env_vars().map_err(|e| anyhow!("{}", e))?, + password, + ); + } + + Ok(AsyncClient::new(options, 100)) +} diff --git a/crates/arroyo-connectors/src/mqtt/mqtt.svg b/crates/arroyo-connectors/src/mqtt/mqtt.svg new file mode 100644 index 000000000..1a0351711 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/mqtt.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/crates/arroyo-connectors/src/mqtt/profile.json b/crates/arroyo-connectors/src/mqtt/profile.json new file mode 100644 index 000000000..42034eb92 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/profile.json @@ -0,0 +1,54 @@ +{ + "type": "object", + "title": "MqttConfig", + "properties": { + "tls": { + "title": "TLS", + "type": "object", + "properties": { + "ca": { + "title": "CA", + "type": "string", + "description": "The path to the CA file", + "format": "var-str" + }, + "cert": { + "title": "Cert", + "type": "string", + "description": "The path to the client cert file", + "format": "var-str" + }, + "key": { + "title": "Key", + "type": "string", + "description": "The path to the client key file", + "format": "var-str" + } + } + }, + "url": { + "title": "Url", + "type": "string", + "description": "The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified" + }, + "clientPrefix": { + "type": "string", + "title": "Client Prefix", + "description": "Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to `arroyo-mqtt`" + }, + "username": { + "title": "Username", + "type": "string", + "description": "The username for your mqtt cluster (if using auth)", + "format": "var-str" + }, + "password": { + "title": "Password", + "type": "string", + "description": "The password for your mqtt cluster (if using auth)", + "format": "var-str" + } + }, + "sensitive": ["password"], + "required": ["url"] +} diff --git a/crates/arroyo-connectors/src/mqtt/sink/mod.rs b/crates/arroyo-connectors/src/mqtt/sink/mod.rs new file mode 100644 index 000000000..3bb1b1dc4 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/sink/mod.rs @@ -0,0 +1,127 @@ +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::time::Duration; + +use crate::mqtt::MqttConfig; +use arroyo_formats::ser::ArrowSerializer; +use arroyo_operator::context::ArrowContext; +use arroyo_operator::operator::ArrowOperator; +use arroyo_rpc::formats::Format; +use arroyo_rpc::ControlResp; +use rumqttc::v5::mqttbytes::QoS; +use rumqttc::v5::AsyncClient; +use rumqttc::v5::ConnectionError; + +#[cfg(test)] +mod test; + +pub struct MqttSinkFunc { + pub config: MqttConfig, + pub qos: QoS, + pub topic: String, + pub retain: bool, + pub serializer: ArrowSerializer, + pub client: Option, + pub stopped: Arc, +} + +impl MqttSinkFunc { + pub fn new(config: MqttConfig, qos: QoS, topic: String, retain: bool, format: Format) -> Self { + Self { + config, + qos, + topic, + retain, + serializer: ArrowSerializer::new(format), + client: None, + stopped: Arc::new(AtomicBool::new(false)), + } + } +} + +#[async_trait] +impl ArrowOperator for MqttSinkFunc { + fn name(&self) -> String { + format!("mqtt-producer-{}", self.topic) + } + async fn on_start(&mut self, ctx: &mut ArrowContext) { + let mut attempts = 0; + while attempts < 20 { + match super::create_connection(&self.config, ctx.task_info.task_index) { + Ok((client, mut eventloop)) => { + self.client = Some(client); + let stopped = self.stopped.clone(); + tokio::spawn(async move { + while !stopped.load(std::sync::atomic::Ordering::Relaxed) { + match eventloop.poll().await { + Ok(_) => (), + Err(err) => match err { + ConnectionError::Timeout(_) => (), + ConnectionError::MqttState(rumqttc::v5::StateError::Io( + err, + )) + | ConnectionError::Io(err) + if err.kind() == std::io::ErrorKind::ConnectionAborted + || err.kind() + == std::io::ErrorKind::ConnectionReset => + { + continue; + } + err => { + tracing::error!("Failed to poll mqtt eventloop: {:?}", err); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }, + } + } + }); + return; + } + Err(e) => { + ctx.report_error("Failed to connect", e.to_string()).await; + } + }; + + tokio::time::sleep(Duration::from_millis((50 * (1 << attempts)).min(5_000))).await; + attempts -= 1; + } + + panic!("Failed to establish connection to mqtt after 20 retries"); + } + + async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) { + for v in self.serializer.serialize(&batch) { + match self + .client + .as_mut() + .unwrap() + .publish(&self.topic, self.qos, self.retain, v) + .await + { + Ok(_) => (), + Err(e) => { + ctx.control_tx + .send(ControlResp::Error { + operator_id: ctx.task_info.operator_id.clone(), + task_index: ctx.task_info.task_index, + message: "Could not write to mqtt".to_string(), + details: format!("{:?}", e), + }) + .await + .unwrap(); + + panic!("Could not write to mqtt: {:?}", e); + } + } + } + } +} + +impl Drop for MqttSinkFunc { + fn drop(&mut self) { + self.stopped + .store(true, std::sync::atomic::Ordering::Relaxed); + } +} diff --git a/crates/arroyo-connectors/src/mqtt/sink/test.rs b/crates/arroyo-connectors/src/mqtt/sink/test.rs new file mode 100644 index 000000000..6c2e17678 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/sink/test.rs @@ -0,0 +1,176 @@ +use arrow::array::{RecordBatch, StringArray}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::mqtt::{create_connection, MqttConfig, Tls}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arroyo_operator::context::ArrowContext; +use arroyo_operator::operator::ArrowOperator; +use arroyo_rpc::df::ArroyoSchema; +use arroyo_rpc::{ + formats::{Format, JsonFormat}, + var_str::VarStr, +}; +use arroyo_types::get_test_task_info; +use parquet::data_type::AsBytes; +use rumqttc::{ + v5::{mqttbytes::QoS, Event, Incoming}, + Outgoing, +}; +use serde::Deserialize; +use tokio::sync::mpsc::channel; + +use super::MqttSinkFunc; + +fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Utf8, + false, + )])) +} + +#[derive(Deserialize)] +struct TestData { + value: String, +} + +pub struct MqttTopicTester { + topic: String, + port: u16, + ca: Option, + cert: Option, + key: Option, + username: Option, + password: Option, +} + +impl MqttTopicTester { + fn get_config(&self) -> MqttConfig { + MqttConfig { + url: format!("tcp://localhost:{}", self.port), + client_prefix: Some("test".to_string()), + username: self.username.as_ref().map(|u| VarStr::new(u.clone())), + password: self.password.as_ref().map(|p| VarStr::new(p.clone())), + tls: Some(Tls { + ca: self.ca.as_ref().map(|ca| VarStr::new(ca.clone())), + cert: self.cert.as_ref().map(|ca| VarStr::new(ca.clone())), + key: self.key.as_ref().map(|ca| VarStr::new(ca.clone())), + }), + } + } + + async fn get_client(&self) -> (rumqttc::v5::AsyncClient, rumqttc::v5::EventLoop) { + let config = self.get_config(); + create_connection(&config, 0).expect("Failed to create connection") + } + + async fn get_sink_with_writes(&self) -> MqttSinkWithWrites { + let config = self.get_config(); + let mut mqtt = MqttSinkFunc::new( + config, + QoS::AtLeastOnce, + self.topic.clone(), + false, + Format::Json(JsonFormat::default()), + ); + + let (_, control_rx) = channel(128); + let (command_tx, _) = channel(128); + + let task_info = get_test_task_info(); + + let mut ctx = ArrowContext::new( + task_info, + None, + control_rx, + command_tx, + 1, + vec![ArroyoSchema::new_unkeyed(schema(), 0)], + None, + None, + vec![vec![]], + HashMap::new(), + ) + .await; + + mqtt.on_start(&mut ctx).await; + + MqttSinkWithWrites { sink: mqtt, ctx } + } +} + +struct MqttSinkWithWrites { + sink: MqttSinkFunc, + ctx: ArrowContext, +} + +#[tokio::test] +async fn test_mqtt() { + let mqtt_tester = MqttTopicTester { + topic: "mqtt-arroyo-test-sink".to_string(), + port: 1883, + ca: None, + cert: None, + key: None, + username: None, + password: None, + }; + + let mut sink_with_writes = mqtt_tester.get_sink_with_writes().await; + let (client, mut eventloop) = mqtt_tester.get_client().await; + + client + .subscribe(&mqtt_tester.topic, QoS::AtLeastOnce) + .await + .unwrap(); + let start = std::time::Instant::now(); + + loop { + match eventloop.poll().await { + Ok(Event::Outgoing(Outgoing::Subscribe(_))) => { + break; + } + _ => { + if start.elapsed().as_secs() > 5 { + panic!("Failed to subscribe to topic"); + } + } + } + } + + for message in 1u32..200 { + let data = StringArray::from_iter_values(vec![message.to_string()].into_iter()); + let batch = RecordBatch::try_new(schema(), vec![Arc::new(data)]).unwrap(); + + sink_with_writes + .sink + .process_batch(batch, &mut sink_with_writes.ctx) + .await; + } + + let mut message = 1u32; + + loop { + match eventloop.poll().await { + Ok(Event::Incoming(Incoming::Publish(p))) => { + let result: TestData = serde_json::from_slice(p.payload.as_bytes()).unwrap(); + assert_eq!( + message.to_string(), + result.value, + "{} {:?}", + message, + String::from_utf8_lossy(p.payload.as_bytes()) + ); + message += 1; + if message >= 200 { + break; + } + } + Ok(_) => (), + Err(err) => { + panic!("Error in mqtt event loop: {:?}", err); + } + } + } +} diff --git a/crates/arroyo-connectors/src/mqtt/source/mod.rs b/crates/arroyo-connectors/src/mqtt/source/mod.rs new file mode 100644 index 000000000..62b985765 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/source/mod.rs @@ -0,0 +1,212 @@ +use async_trait::async_trait; +use std::collections::HashMap; +use std::num::NonZeroU32; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use arroyo_rpc::formats::{BadData, Format, Framing}; +use arroyo_rpc::{grpc::StopMode, ControlMessage, ControlResp}; +use arroyo_types::{ArrowMessage, SignalMessage, UserError, Watermark}; +use governor::{Quota, RateLimiter as GovernorRateLimiter}; +use rumqttc::v5::mqttbytes::QoS; +use rumqttc::v5::{ConnectionError, Event as MqttEvent, Incoming}; +use rumqttc::Outgoing; + +use crate::mqtt::{create_connection, MqttConfig}; +use arroyo_operator::context::ArrowContext; +use arroyo_operator::operator::SourceOperator; +use arroyo_operator::SourceFinishType; +use arroyo_rpc::grpc::TableConfig; +use tokio::select; +use tokio::time::MissedTickBehavior; + +#[cfg(test)] +mod test; + +pub struct MqttSourceFunc { + pub config: MqttConfig, + pub topic: String, + pub qos: QoS, + pub format: Format, + pub framing: Option, + pub bad_data: Option, + pub messages_per_second: NonZeroU32, + pub subscribed: Arc, +} + +#[async_trait] +impl SourceOperator for MqttSourceFunc { + fn name(&self) -> String { + format!("mqtt-{}", self.topic) + } + + fn tables(&self) -> HashMap { + arroyo_state::global_table_config("m", "mqtt source state") + } + + async fn run(&mut self, ctx: &mut ArrowContext) -> SourceFinishType { + match self.run_int(ctx).await { + Ok(r) => r, + Err(e) => { + ctx.control_tx + .send(ControlResp::Error { + operator_id: ctx.task_info.operator_id.clone(), + task_index: ctx.task_info.task_index, + message: e.name.clone(), + details: e.details.clone(), + }) + .await + .unwrap(); + + panic!("{}: {}", e.name, e.details); + } + } + } +} + +impl MqttSourceFunc { + pub fn new( + config: MqttConfig, + topic: String, + qos: QoS, + format: Format, + framing: Option, + bad_data: Option, + messages_per_second: u32, + ) -> Self { + Self { + config, + topic, + qos, + format, + framing, + bad_data, + messages_per_second: NonZeroU32::new(messages_per_second).unwrap(), + subscribed: Arc::new(AtomicBool::new(false)), + } + } + + pub fn subscribed(&self) -> Arc { + self.subscribed.clone() + } + + async fn run_int(&mut self, ctx: &mut ArrowContext) -> Result { + ctx.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + ); + + if ctx.task_info.task_index > 0 { + tracing::warn!( + "Mqtt Consumer {}-{} can only be executed on a single worker... setting idle", + ctx.task_info.operator_id, + ctx.task_info.task_index + ); + ctx.broadcast(ArrowMessage::Signal(SignalMessage::Watermark( + Watermark::Idle, + ))) + .await; + } + + let (client, mut eventloop) = + match create_connection(&self.config, ctx.task_info.task_index) { + Ok(c) => c, + Err(e) => { + return Err(UserError { + name: "MqttSourceError".to_string(), + details: format!("Failed to create connection: {}", e), + }); + } + }; + + match client.subscribe(self.topic.clone(), self.qos).await { + Ok(_) => (), + Err(e) => { + return Err(UserError { + name: "MqttSourceError".to_string(), + details: format!("Failed to subscribe to topic: {}", e), + }); + } + } + + let rate_limiter = GovernorRateLimiter::direct(Quota::per_second(self.messages_per_second)); + + let topic = self.topic.clone(); + let qos = self.qos; + let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); + flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + event = eventloop.poll() => { + match event { + Ok(MqttEvent::Incoming(Incoming::Publish(p))) => { + ctx.deserialize_slice(&p.payload, SystemTime::now()).await?; + rate_limiter.until_ready().await; + } + Ok(MqttEvent::Outgoing(Outgoing::Subscribe(_))) => { + self.subscribed.store(true, Ordering::Relaxed); + } + Ok(_) => (), + Err(err) => { + if let ConnectionError::Timeout(_) = err { + continue; + } + tracing::error!("Failed to poll mqtt eventloop: {}", err); + if let Err(err) = client + .subscribe( + topic.clone(), + qos, + ) + .await { + return Err(UserError { + name: "MqttSourceError".to_string(), + details: format!("Error while subscribing to mqtt topic {}: {:?}", topic, err), + }); + } + } + } + } + _ = flush_ticker.tick() => { + if ctx.should_flush() { + ctx.flush_buffer().await?; + } + } + control_message = ctx.control_rx.recv() => { + match control_message { + Some(ControlMessage::Checkpoint(c)) => { + tracing::debug!("starting checkpointing {}", ctx.task_info.task_index); + if self.start_checkpoint(c, ctx).await { + return Ok(SourceFinishType::Immediate); + } + }, + Some(ControlMessage::Stop { mode }) => { + tracing::info!("Stopping Mqtt source: {:?}", mode); + + match mode { + StopMode::Graceful => { + return Ok(SourceFinishType::Graceful); + } + StopMode::Immediate => { + return Ok(SourceFinishType::Immediate); + } + } + } + Some(ControlMessage::Commit { .. }) => { + unreachable!("sources shouldn't receive commit messages"); + } + Some(ControlMessage::LoadCompacted {compacted}) => { + ctx.load_compacted(compacted).await; + } + Some(ControlMessage::NoOp) => {} + None => { + + } + } + } + } + } + } +} diff --git a/crates/arroyo-connectors/src/mqtt/source/test.rs b/crates/arroyo-connectors/src/mqtt/source/test.rs new file mode 100644 index 000000000..e5eecfb3d --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/source/test.rs @@ -0,0 +1,222 @@ +use arrow::array::UInt64Array; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use crate::mqtt::{create_connection, MqttConfig, Tls}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arroyo_operator::context::{ArrowContext, QueueItem}; +use arroyo_operator::operator::SourceOperator; +use arroyo_rpc::df::ArroyoSchema; +use arroyo_rpc::formats::{Format, JsonFormat}; +use arroyo_rpc::var_str::VarStr; +use arroyo_rpc::{ControlMessage, ControlResp}; +use arroyo_types::{ArrowMessage, TaskInfo}; +use rand::random; +use rumqttc::v5::mqttbytes::QoS; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +use super::MqttSourceFunc; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct TestData { + value: u64, +} + +struct MqttSourceWithReads { + to_control_tx: Sender, + #[allow(dead_code)] + from_control_rx: Receiver, + data_recv: Receiver, + subscribed: Arc, +} + +impl MqttSourceWithReads { + async fn wait_for_subscription(&self, timeout: std::time::Duration) { + let start = std::time::Instant::now(); + while !self.subscribed.load(Ordering::Relaxed) { + if start.elapsed() > timeout { + panic!("Timed out waiting for subscription"); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + + async fn assert_next_message_record_value(&mut self, mut expected_values: VecDeque) { + match self.data_recv.recv().await { + Some(item) => { + if let ArrowMessage::Data(record) = item { + let a = record.columns()[1] + .as_any() + .downcast_ref::() + .unwrap(); + + for v in a { + assert_eq!( + expected_values + .pop_front() + .expect("found more elements than expected"), + v.unwrap() + ); + } + } else { + unreachable!("expected data, got {:?}", item); + } + } + None => { + unreachable!("option shouldn't be missing") + } + } + } +} + +pub struct MqttTopicTester { + topic: String, + port: u16, + ca: Option, + cert: Option, + key: Option, + username: Option, + password: Option, +} + +impl MqttTopicTester { + fn get_config(&self) -> MqttConfig { + MqttConfig { + url: format!("tcp://localhost:{}", self.port), + client_prefix: Some("test".to_string()), + username: self.username.as_ref().map(|u| VarStr::new(u.clone())), + password: self.password.as_ref().map(|p| VarStr::new(p.clone())), + tls: Some(Tls { + ca: self.ca.as_ref().map(|ca| VarStr::new(ca.clone())), + cert: self.cert.as_ref().map(|ca| VarStr::new(ca.clone())), + key: self.key.as_ref().map(|ca| VarStr::new(ca.clone())), + }), + } + } + + async fn get_client(&self) -> rumqttc::v5::AsyncClient { + let config = self.get_config(); + let (client, mut eventloop) = + create_connection(&config, 0).expect("Failed to create connection"); + + tokio::spawn(async move { + loop { + let event = eventloop.poll().await; + if let Err(err) = event { + tracing::error!("Error in mqtt event loop: {:?}", err); + panic!("Error in mqtt event loop: {:?}", err); + } + } + }); + + client + } + + async fn get_source_with_reader(&self, task_info: TaskInfo) -> MqttSourceWithReads { + let config = self.get_config(); + + let mut mqtt = MqttSourceFunc::new( + config, + self.topic.clone(), + QoS::AtLeastOnce, + Format::Json(JsonFormat::default()), + None, + None, + 10, + ); + + let (to_control_tx, control_rx) = channel(128); + let (command_tx, from_control_rx) = channel(128); + let (data_tx, recv) = channel(128); + + let mut ctx = ArrowContext::new( + task_info, + None, + control_rx, + command_tx, + 1, + vec![], + Some(ArroyoSchema::new_unkeyed( + Arc::new(Schema::new(vec![ + Field::new( + "_timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::UInt64, false), + ])), + 0, + )), + None, + vec![vec![data_tx]], + mqtt.tables(), + ) + .await; + + let subscribed = mqtt.subscribed(); + tokio::spawn(async move { + mqtt.on_start(&mut ctx).await; + mqtt.run(&mut ctx).await; + }); + + MqttSourceWithReads { + to_control_tx, + from_control_rx, + data_recv: recv, + subscribed, + } + } +} + +#[tokio::test] +async fn test_mqtt() { + let mqtt_tester = MqttTopicTester { + topic: "mqtt-arroyo-test".to_string(), + port: 1883, + ca: None, + cert: None, + key: None, + username: None, + password: None, + }; + + let mut task_info = arroyo_types::get_test_task_info(); + task_info.job_id = format!("mqtt-job-{}", random::()); + + let mut reader = mqtt_tester.get_source_with_reader(task_info.clone()).await; + + reader + .wait_for_subscription(std::time::Duration::from_secs(5)) + .await; + + let client = mqtt_tester.get_client().await; + + let mut expected = vec![]; + for message in 1u64..20 { + let data = TestData { value: message }; + expected.push(message); + client + .publish( + &mqtt_tester.topic, + QoS::AtLeastOnce, + false, + serde_json::to_vec(&data).unwrap(), + ) + .await + .expect("Failed to publish message"); + } + + reader + .assert_next_message_record_value(expected.into()) + .await; + + reader + .to_control_tx + .send(ControlMessage::Stop { + mode: arroyo_rpc::grpc::StopMode::Graceful, + }) + .await + .unwrap(); +} diff --git a/crates/arroyo-connectors/src/mqtt/table.json b/crates/arroyo-connectors/src/mqtt/table.json new file mode 100644 index 000000000..0070d81a9 --- /dev/null +++ b/crates/arroyo-connectors/src/mqtt/table.json @@ -0,0 +1,43 @@ +{ + "type": "object", + "title": "MqttTable", + "properties": { + "topic": { + "title": "Topic", + "type": "string", + "description": "The MQTT topic to use for this table" + }, + "qos": { + "type": "string", + "title": "Quality of Service", + "description": "The Quality of Service to use for this topic", + "enum": ["AtMostOnce", "AtLeastOnce", "ExactlyOnce"] + }, + "type": { + "type": "object", + "title": "Table Type", + "oneOf": [ + { + "type": "object", + "title": "Source", + "additionalProperties": false, + "properties": {} + }, + { + "type": "object", + "title": "Sink", + "properties": { + "retain": { + "type": "boolean", + "title": "Retain", + "description": "Whether to retain messages published to this topic" + } + }, + "required": ["retain"], + "additionalProperties": false + } + ] + } + }, + "required": ["topic", "type"] +} diff --git a/crates/arroyo-connectors/src/polling_http/mod.rs b/crates/arroyo-connectors/src/polling_http/mod.rs index 9deb64e3f..624031628 100644 --- a/crates/arroyo-connectors/src/polling_http/mod.rs +++ b/crates/arroyo-connectors/src/polling_http/mod.rs @@ -197,7 +197,7 @@ impl Connector for PollingHTTPConnector { let description = format!("PollingHTTPSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?, ':').ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -247,6 +247,7 @@ impl Connector for PollingHTTPConnector { .as_ref() .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) .unwrap_or("".to_string()), + ':', ) .expect("Invalid header map") .into_iter() diff --git a/crates/arroyo-connectors/src/sse/mod.rs b/crates/arroyo-connectors/src/sse/mod.rs index 91e17fc8d..63ce22403 100644 --- a/crates/arroyo-connectors/src/sse/mod.rs +++ b/crates/arroyo-connectors/src/sse/mod.rs @@ -82,7 +82,7 @@ impl Connector for SSEConnector { let description = format!("SSESource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?, ':').ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -192,6 +192,7 @@ impl SseTester { .map(|s| s.sub_env_vars()) .transpose()? .unwrap_or("".to_string()), + ':', ) .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))?; diff --git a/crates/arroyo-connectors/src/sse/operator.rs b/crates/arroyo-connectors/src/sse/operator.rs index 5ac056801..3cb3dda06 100644 --- a/crates/arroyo-connectors/src/sse/operator.rs +++ b/crates/arroyo-connectors/src/sse/operator.rs @@ -41,7 +41,7 @@ impl SSESourceFunc { Ok(OperatorNode::from_source(Box::new(SSESourceFunc { url: table.endpoint, - headers: string_to_map(&headers.unwrap_or("".to_string())) + headers: string_to_map(&headers.unwrap_or("".to_string()), ':') .expect("Invalid header map") .into_iter() .collect(), diff --git a/crates/arroyo-connectors/src/websocket/mod.rs b/crates/arroyo-connectors/src/websocket/mod.rs index 2fc4a4997..7c41e909b 100644 --- a/crates/arroyo-connectors/src/websocket/mod.rs +++ b/crates/arroyo-connectors/src/websocket/mod.rs @@ -89,7 +89,7 @@ impl Connector for WebsocketConnector { } }; - let headers = match string_to_map(&headers_str.unwrap_or("".to_string())) + let headers = match string_to_map(&headers_str.unwrap_or("".to_string()), ':') .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs")) { Ok(headers) => headers, @@ -224,7 +224,7 @@ impl Connector for WebsocketConnector { let description = format!("WebsocketSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?, ':').ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index 6a441676b..bba08a5f2 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -201,7 +201,10 @@ impl ArrowDeserializer { Format::RawString(_) | Format::Json(JsonFormat { unstructured: true, .. - }) => self.deserialize_raw_string(buffer, msg), + }) => { + self.deserialize_raw_string(buffer, msg); + add_timestamp(buffer, self.schema.timestamp_index, timestamp); + } Format::Json(json) => { let msg = if json.confluent_schema_registry { &msg[5..] @@ -223,8 +226,6 @@ impl ArrowDeserializer { Format::Parquet(_) => todo!("parquet is not supported as an input format"), } - add_timestamp(buffer, self.schema.timestamp_index, timestamp); - Ok(()) } diff --git a/crates/arroyo-operator/src/connector.rs b/crates/arroyo-operator/src/connector.rs index 812b2e9f9..a00ca1036 100644 --- a/crates/arroyo-operator/src/connector.rs +++ b/crates/arroyo-operator/src/connector.rs @@ -105,9 +105,7 @@ pub trait Connector: Send { profile: Self::ProfileT, table: Self::TableT, config: OperatorConfig, - ) -> anyhow::Result { - todo!("constructor") - } + ) -> anyhow::Result; } pub trait ErasedConnector: Send { diff --git a/crates/arroyo-operator/src/context.rs b/crates/arroyo-operator/src/context.rs index beb4542e7..405bcfe1c 100644 --- a/crates/arroyo-operator/src/context.rs +++ b/crates/arroyo-operator/src/context.rs @@ -101,10 +101,6 @@ impl ContextBuffer { } } - fn buffer(&mut self) -> &mut Vec> { - &mut self.buffer - } - pub fn size(&self) -> usize { self.buffer[0].len() } @@ -451,13 +447,6 @@ impl ArrowContext { .unwrap_or(false) } - pub fn buffer(&mut self) -> &mut Vec> { - self.buffer - .as_mut() - .expect("tried to get buffer for node without out schema") - .buffer() - } - pub async fn broadcast(&mut self, message: ArrowMessage) { if let Err(e) = self.flush_buffer().await { self.buffered_error.replace(e); diff --git a/crates/arroyo-rpc/src/schema_resolver.rs b/crates/arroyo-rpc/src/schema_resolver.rs index d8762545f..cea3cc0fc 100644 --- a/crates/arroyo-rpc/src/schema_resolver.rs +++ b/crates/arroyo-rpc/src/schema_resolver.rs @@ -232,7 +232,7 @@ impl ConfluentSchemaRegistryClient { match status { StatusCode::CONFLICT => { bail!( - "there is already an existing schema for this topic which is \ + "there is already an existing schema for this subject which is \ incompatible with the new schema being registered:\n\n{}", body ) @@ -244,7 +244,7 @@ impl ConfluentSchemaRegistryClient { bail!("invalid credentials for schema registry"); } StatusCode::NOT_FOUND => { - bail!("schema not found; make sure that the topic exists") + bail!("schema not found; make sure that the subject exists") } code => { bail!("schema registry returned error {}: {}", code.as_u16(), body); @@ -306,26 +306,26 @@ impl ConfluentSchemaRegistryClient { pub struct ConfluentSchemaRegistry { client: ConfluentSchemaRegistryClient, - topic: String, + subject: String, } impl ConfluentSchemaRegistry { pub fn new( endpoint: &str, - topic: &str, + subject: &str, api_key: Option, api_secret: Option, ) -> anyhow::Result { Ok(Self { client: ConfluentSchemaRegistryClient::new(endpoint, api_key, api_secret)?, - topic: topic.to_string(), + subject: subject.to_string(), }) } - fn topic_endpoint(&self) -> Url { + fn subject_endpoint(&self) -> Url { self.client .endpoint - .join(&format!("subjects/{}-value/versions/", self.topic)) + .join(&format!("subjects/{}/versions/", self.subject)) .unwrap() } @@ -335,9 +335,9 @@ impl ConfluentSchemaRegistry { schema_type: ConfluentSchemaType, ) -> anyhow::Result { self.client - .write_schema(self.topic_endpoint(), schema, schema_type) + .write_schema(self.subject_endpoint(), schema, schema_type) .await - .context(format!("topic '{}'", self.topic)) + .context(format!("subject '{}'", self.subject)) } pub async fn get_schema_for_id( @@ -350,10 +350,10 @@ impl ConfluentSchemaRegistry { .join(&format!("/schemas/ids/{}", id)) .unwrap(); - self.client - .get_schema_for_url(url) - .await - .context(format!("failed to fetch schema for topic '{}'", self.topic)) + self.client.get_schema_for_url(url).await.context(format!( + "failed to fetch schema for subject '{}'", + self.subject + )) } pub async fn get_schema_for_version( @@ -364,11 +364,11 @@ impl ConfluentSchemaRegistry { .map(|v| format!("{}", v)) .unwrap_or_else(|| "latest".to_string()); - let url = self.topic_endpoint().join(&version).unwrap(); + let url = self.subject_endpoint().join(&version).unwrap(); self.client.get_schema_for_url(url).await.context(format!( - "failed to fetch schema for topic '{}' with version {}", - self.topic, version + "failed to fetch schema for subject '{}' with version {}", + self.subject, version )) } } diff --git a/crates/arroyo-types/src/lib.rs b/crates/arroyo-types/src/lib.rs index ac93d134a..b75387d41 100644 --- a/crates/arroyo-types/src/lib.rs +++ b/crates/arroyo-types/src/lib.rs @@ -333,14 +333,14 @@ pub fn days_since_epoch(time: SystemTime) -> i32 { .div_euclid(86400) as i32 } -pub fn string_to_map(s: &str) -> Option> { +pub fn string_to_map(s: &str, pair_delimeter: char) -> Option> { if s.trim().is_empty() { return Some(HashMap::new()); } s.split(',') .map(|s| { - let mut kv = s.trim().split(':'); + let mut kv = s.trim().split(pair_delimeter); Some((kv.next()?.trim().to_string(), kv.next()?.trim().to_string())) }) .collect() @@ -521,6 +521,7 @@ impl<'de> Deserialize<'de> for DebeziumOp { "c" => Ok(DebeziumOp::Create), "u" => Ok(DebeziumOp::Update), "d" => Ok(DebeziumOp::Delete), + "r" => Ok(DebeziumOp::Create), _ => Err(serde::de::Error::custom(format!( "Invalid DebeziumOp {}", s diff --git a/crates/arroyo-worker/src/connectors/mod.rs b/crates/arroyo-worker/src/connectors/mod.rs index 7d9ee4a9e..352dc7a3b 100644 --- a/crates/arroyo-worker/src/connectors/mod.rs +++ b/crates/arroyo-worker/src/connectors/mod.rs @@ -4,6 +4,7 @@ pub mod fluvio; pub mod impulse; pub mod kafka; pub mod kinesis; +pub mod mqtt; pub mod nexmark; pub mod polling_http; pub mod redis; diff --git a/k8s/arroyo/templates/api.yaml b/k8s/arroyo/templates/api.yaml new file mode 100644 index 000000000..bb32e0f83 --- /dev/null +++ b/k8s/arroyo/templates/api.yaml @@ -0,0 +1,107 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "arroyo.fullname" . }}-api + labels: + {{- include "arroyo.labels" . | nindent 4 }} + app: {{ include "arroyo.fullname" . }}-api +spec: + replicas: {{ .Values.api.replicas }} + selector: + matchLabels: + app: {{ include "arroyo.fullname" . }}-api + template: + metadata: + labels: + {{- include "arroyo.labels" . | nindent 8 }} + app: {{ include "arroyo.fullname" . }}-api + annotations: + {{- if .Values.prometheus.setAnnotations }} + prometheus.io/scrape: "true" + prometheus.io/path: /metrics + prometheus.io/port: "8001" + {{- end }} + {{- with .Values.podAnnotations }} + {{- toYaml . | nindent 8 }} + {{- end }} + + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "arroyo.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: arroyo-api + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.api.image.repository }}:{{ .Values.api.image.tag }}" + imagePullPolicy: {{ .Values.api.image.pullPolicy }} + args: ["api"] + env: + {{- include "arroyo.databaseEnvVars" . | nindent 8 }} + - name: CONTROLLER_ADDR + value: "http://{{ include "arroyo.fullname" . }}-controller:9190" + - name: PROM_ENDPOINT + value: "{{ include "arroyo.prometheusEndpoint" .}}" + - name: API_METRICS_RATE + value: "{{ .Values.prometheus.queryRate }}" + {{- include "arroyo.existingConfigMap" . | nindent 8 }} + ports: + - containerPort: 8000 + name: http + - containerPort: 8001 + name: admin + livenessProbe: + httpGet: + path: /status + port: admin + initialDelaySeconds: 5 + readinessProbe: + httpGet: + path: /status + port: admin + initialDelaySeconds: 5 + {{- if .Values.api.resources }} + resources: {{- toYaml .Values.api.resources | nindent 12 }} + {{- end }} + volumeMounts: + {{- if .Values.volumeMounts }} + {{- include "tplvalues.render" (dict "value" .Values.volumeMounts "context" $) | nindent 10 }} + {{- end }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + volumes: + {{- if .Values.volumes }} + {{- include "tplvalues.render" (dict "value" .Values.volumes "context" $) | nindent 10 }} + {{- end }} + +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "arroyo.fullname" . }}-api +spec: + selector: + app: {{ include "arroyo.fullname" . }}-api + ports: + - name: http + protocol: TCP + port: {{ .Values.api.service.httpPort }} + targetPort: 8000 + - name: admin + protocol: TCP + port: {{ .Values.api.service.adminPort }} + targetPort: 8001 diff --git a/webui/src/routes/connections/JsonForm.tsx b/webui/src/routes/connections/JsonForm.tsx index ba556c93f..4c0e545fd 100644 --- a/webui/src/routes/connections/JsonForm.tsx +++ b/webui/src/routes/connections/JsonForm.tsx @@ -15,6 +15,7 @@ import { Select, Spinner, Stack, + Switch, Text, Textarea, Tooltip, @@ -166,6 +167,49 @@ function NumberWidget({ ); } +function BooleanWidget({ + path, + title, + description, + value, + errors, + onChange, + readonly, +}: { + path: string; + title: string; + description?: string; + value: boolean; + errors: any; + onChange: (e: React.ChangeEvent) => void; + readonly?: boolean; +}) { + useEffect(() => { + if (!value) { + // @ts-ignore + onChange({ target: { name: path, value: false } }); + } + }, [path]); + + return ( + + + {title} + onChange(e)} /> + + {errors[path] ? ( + {errors[path]} + ) : ( + description && ( + + {description} + + ) + )} + + ); +} + function AutocompleteWidget({ path, title, @@ -771,6 +815,19 @@ export function FormInner({ /> ); } + case 'boolean': { + return ( + + ); + } case 'array': { return (