diff --git a/Cargo.lock b/Cargo.lock index e7c1e4b95a37..8feab0c55c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2256,6 +2256,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -2345,6 +2351,24 @@ dependencies = [ "serde", ] +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "h2" version = "0.3.18" @@ -3158,6 +3182,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -3437,6 +3470,7 @@ dependencies = [ "enum-kinds", "fail", "futures", + "governor", "hex", "itertools", "launchdarkly-server-sdk", @@ -3782,6 +3816,8 @@ version = "0.0.0" dependencies = [ "anyhow", "async-trait", + "chrono", + "futures", "k8s-openapi", "kube", "mz-ore", @@ -5989,6 +6025,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.2" @@ -5999,6 +6041,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -7063,6 +7111,22 @@ dependencies = [ "once_cell", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "2.0.0" @@ -7135,6 +7199,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.5.1" diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 0d3a84d84ef6..0b8541f69798 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -19,6 +19,7 @@ differential-dataflow = "0.12.0" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" +governor = "0.6.0" hex = "0.4.3" itertools = "0.10.5" once_cell = "1.16.0" diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 670204ec4d59..8ad926f4e8d0 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -84,7 +84,7 @@ use mz_adapter_types::compaction::DEFAULT_LOGICAL_COMPACTION_WINDOW_TS; use mz_adapter_types::connection::ConnectionId; use mz_build_info::BuildInfo; use mz_catalog::memory::objects::{CatalogEntry, CatalogItem, Connection, DataSourceDesc, Source}; -use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig}; +use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::Plan; use mz_compute_types::ComputeInstanceId; @@ -172,6 +172,7 @@ mod ddl; mod indexes; mod introspection; mod message_handler; +mod privatelink_status; mod read_policy; mod sequencer; mod sql; @@ -234,6 +235,7 @@ pub enum Message { stage: PeekStage, }, DrainStatementLog, + PrivateLinkVpcEndpointEvents(BTreeMap), } impl Message { @@ -274,6 +276,7 @@ impl Message { Message::PeekStageReady { .. } => "peek_stage_ready", Message::DrainStatementLog => "drain_statement_log", Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready", + Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events", } } } @@ -2014,6 +2017,7 @@ impl Coordinator { }); self.schedule_storage_usage_collection().await; + self.spawn_privatelink_vpc_endpoints_watch_task(); self.spawn_statement_logging_task(); flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle); diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 70060004945d..6f425056e187 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -137,6 +137,9 @@ impl Coordinator { Message::DrainStatementLog => { self.drain_statement_log().await; } + Message::PrivateLinkVpcEndpointEvents(events) => { + self.write_privatelink_status_updates(events).await; + } } } .boxed_local() diff --git a/src/adapter/src/coord/privatelink_status.rs b/src/adapter/src/coord/privatelink_status.rs new file mode 100644 index 000000000000..77e5e0669723 --- /dev/null +++ b/src/adapter/src/coord/privatelink_status.rs @@ -0,0 +1,90 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::num::NonZeroU32; +use std::{collections::BTreeMap, sync::Arc}; + +use futures::stream::StreamExt; +use futures::FutureExt; +use governor::{Quota, RateLimiter}; + +use mz_cloud_resources::VpcEndpointEvent; +use mz_ore::task::spawn; +use mz_repr::{Datum, GlobalId, Row}; +use mz_storage_client::controller::IntrospectionType; + +use crate::coord::Coordinator; + +use super::Message; + +impl Coordinator { + pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) { + let internal_cmd_tx = self.internal_cmd_tx.clone(); + let rate_quota: i32 = self + .catalog + .system_config() + .privatelink_status_update_quota_per_minute() + .try_into() + .expect("value coouldn't be converted to i32"); + if let Some(controller) = &self.cloud_resource_controller { + let controller = Arc::clone(controller); + spawn(|| "privatelink_vpc_endpoint_watch", async move { + let mut stream = controller.watch_vpc_endpoints().await; + // Using a per-minute quota implies a burst-size of the same amount + let rate_limiter = RateLimiter::direct(Quota::per_minute( + NonZeroU32::new(rate_quota).expect("will be non-zero"), + )); + + loop { + // Wait for an event to become available + if let Some(event) = stream.next().await { + // Wait until we're permitted to tell the coordinator about the event + rate_limiter.until_ready().await; + + // Drain any additional events from the queue that accumulated while + // waiting for the rate limiter, deduplicating by connection_id. + let mut events = BTreeMap::new(); + events.insert(event.connection_id, event); + while let Some(event) = stream.next().now_or_never().and_then(|e| e) { + events.insert(event.connection_id, event); + } + + // Send the event batch to the coordinator. + let _ = internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(events)); + } + } + }); + } + } + + pub(crate) async fn write_privatelink_status_updates( + &mut self, + events: BTreeMap, + ) { + let mut updates = Vec::new(); + for value in events.into_values() { + updates.push(( + Row::pack_slice(&[ + Datum::TimestampTz(value.time.try_into().expect("must fit")), + Datum::String(&value.connection_id.to_string()), + Datum::String(&value.status.to_string()), + ]), + 1, + )); + } + + self.controller + .storage + .record_introspection_updates( + IntrospectionType::PrivatelinkConnectionStatusHistory, + updates, + ) + .await; + } +} diff --git a/src/cloud-resources/Cargo.toml b/src/cloud-resources/Cargo.toml index 540410185725..4505a2979d4c 100644 --- a/src/cloud-resources/Cargo.toml +++ b/src/cloud-resources/Cargo.toml @@ -11,6 +11,8 @@ anyhow = "1.0.66" async-trait = "0.1.68" k8s-openapi = { version = "0.20.0", features = ["schemars", "v1_26"] } kube = { version = "0.87.1", default-features = false, features = ["client", "derive", "openssl-tls", "ws"] } +chrono = { version = "0.4.23", default-features = false } +futures = "0.3.25" mz-ore = { path = "../ore", features = [] } mz-repr = { path = "../repr" } schemars = { version = "0.8", features = ["uuid1"] } diff --git a/src/cloud-resources/src/crd/vpc_endpoint.rs b/src/cloud-resources/src/crd/vpc_endpoint.rs index a0ace5bd2901..6383a5544cc7 100644 --- a/src/cloud-resources/src/crd/vpc_endpoint.rs +++ b/src/cloud-resources/src/crd/vpc_endpoint.rs @@ -9,6 +9,7 @@ //! VpcEndpoint custom resource, to be reconciled into an AWS VPC Endpoint by the //! environment-controller. +use std::fmt; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition; use kube::CustomResource; @@ -78,6 +79,27 @@ pub mod v1 { Rejected, Unknown, } + + impl fmt::Display for VpcEndpointState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let repr = match self { + VpcEndpointState::PendingServiceDiscovery => "pending-service-discovery", + VpcEndpointState::CreatingEndpoint => "creating-endpoint", + VpcEndpointState::RecreatingEndpoint => "recreating-endpoint", + VpcEndpointState::UpdatingEndpoint => "updating-endpoint", + VpcEndpointState::Available => "available", + VpcEndpointState::Deleted => "deleted", + VpcEndpointState::Deleting => "deleting", + VpcEndpointState::Expired => "expired", + VpcEndpointState::Failed => "failed", + VpcEndpointState::Pending => "pending", + VpcEndpointState::PendingAcceptance => "pending-acceptance", + VpcEndpointState::Rejected => "rejected", + VpcEndpointState::Unknown => "unknown", + }; + write!(f, "{}", repr) + } + } } #[cfg(test)] diff --git a/src/cloud-resources/src/lib.rs b/src/cloud-resources/src/lib.rs index baa35cd13208..9b021829d741 100644 --- a/src/cloud-resources/src/lib.rs +++ b/src/cloud-resources/src/lib.rs @@ -80,13 +80,16 @@ use std::collections::BTreeMap; use std::fmt::{self, Debug}; +use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use futures::stream::BoxStream; use mz_repr::GlobalId; use serde::{Deserialize, Serialize}; -use crate::crd::vpc_endpoint::v1::VpcEndpointStatus; +use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus}; pub mod crd; @@ -153,6 +156,9 @@ pub trait CloudResourceController: CloudResourceReader { &self, ) -> Result, anyhow::Error>; + /// Lists existing `VpcEndpoint` Kubernetes objects. + async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>; + /// Returns a reader for the resources managed by this controller. fn reader(&self) -> Arc; } @@ -170,6 +176,13 @@ pub fn vpc_endpoint_name(id: GlobalId) -> String { format!("connection-{id}") } +/// Attempts to return the ID used to create the given VPC endpoint name. +pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option { + vpc_endpoint_name + .split_once('-') + .and_then(|(_, id_str)| GlobalId::from_str(id_str).ok()) +} + /// Returns the host to use for the VPC endpoint with the given ID and /// optionally in the given availability zone. pub fn vpc_endpoint_host(id: GlobalId, availability_zone: Option<&str>) -> String { @@ -181,3 +194,10 @@ pub fn vpc_endpoint_host(id: GlobalId, availability_zone: Option<&str>) -> Strin None => name, } } + +#[derive(Debug)] +pub struct VpcEndpointEvent { + pub connection_id: GlobalId, + pub status: VpcEndpointState, + pub time: DateTime, +} diff --git a/src/orchestrator-kubernetes/src/cloud_resource_controller.rs b/src/orchestrator-kubernetes/src/cloud_resource_controller.rs index 4500db3324a5..7d63257129b8 100644 --- a/src/orchestrator-kubernetes/src/cloud_resource_controller.rs +++ b/src/orchestrator-kubernetes/src/cloud_resource_controller.rs @@ -10,17 +10,24 @@ //! Management of K8S objects, such as VpcEndpoints. use std::collections::BTreeMap; -use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; +use chrono::Utc; +use futures::stream::BoxStream; +use futures::StreamExt; use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams}; +use kube::runtime::{watcher, WatchStreamExt}; use kube::{Api, ResourceExt}; use maplit::btreemap; use mz_repr::GlobalId; -use mz_cloud_resources::crd::vpc_endpoint::v1::{VpcEndpoint, VpcEndpointSpec, VpcEndpointStatus}; -use mz_cloud_resources::{CloudResourceController, CloudResourceReader, VpcEndpointConfig}; +use mz_cloud_resources::crd::vpc_endpoint::v1::{ + VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus, +}; +use mz_cloud_resources::{ + CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent, +}; use crate::{util, KubernetesOrchestrator, FIELD_MANAGER}; @@ -89,11 +96,7 @@ impl CloudResourceController for KubernetesOrchestrator { let objects = self.vpc_endpoint_api.list(&ListParams::default()).await?; let mut endpoints = BTreeMap::new(); for object in objects { - let maybe_id = object - .name_any() - .split_once('-') - .and_then(|(_, id_str)| GlobalId::from_str(id_str).ok()); - let id = match maybe_id { + let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) { Some(id) => id, // Ignore any object whose name can't be parsed as a GlobalId None => continue, @@ -103,6 +106,63 @@ impl CloudResourceController for KubernetesOrchestrator { Ok(endpoints) } + async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> { + let stream = watcher(self.vpc_endpoint_api.clone(), watcher::Config::default()) + .touched_objects() + .filter_map(|object| async move { + match object { + Ok(vpce) => { + if let Some(connection_id) = + mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any()) + { + if let Some(state) = + vpce.status.as_ref().and_then(|st| st.state.to_owned()) + { + return Some(VpcEndpointEvent { + connection_id, + status: state, + // Use the 'Available' Condition on the VPCE Status to set the event-time, falling back + // to now if it's not set + time: vpce + .status + .unwrap() + .conditions + .and_then(|c| { + c.into_iter().find(|c| &c.type_ == "Available") + }) + .and_then(|condition| { + Some(condition.last_transition_time.0) + }) + .unwrap_or_else(Utc::now), + }); + } else { + // The Status/State is not yet populated on the VpcEndpoint, which means it was just + // initialized and hasn't yet been reconciled by the environment-controller + // We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created + // even if we don't yet have an accurate status + return Some(VpcEndpointEvent { + connection_id, + status: VpcEndpointState::Unknown, + time: vpce.creation_timestamp().expect("Will exist").0, + }); + } + // TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the + // resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState + // which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val + } + None + } + Err(error) => { + // We assume that errors returned by Kubernetes are usually transient, so we + // just log a warning and ignore them otherwise. + tracing::warn!("vpc endpoint watch error: {error}"); + None + } + } + }); + Box::pin(stream) + } + fn reader(&self) -> Arc { let reader = Arc::clone(&self.resource_reader); reader diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index d27f80d5ca32..d910dbe8f523 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1325,6 +1325,14 @@ const OPTIMIZER_ONESHOT_STATS_TIMEOUT: ServerVar = ServerVar { internal: true, }; +const PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE: ServerVar = ServerVar { + name: UncasedStr::new("privatelink_status_update_quota_per_minute"), + value: &20, + description: "Sets the per-minute quota for privatelink vpc status updates to be written to \ + the storage-collection-backed system table. This value implies the total and burst quota per-minute.", + internal: true, +}; + static DEFAULT_STATEMENT_LOGGING_SAMPLE_RATE: Lazy = Lazy::new(|| 0.1.into()); pub static STATEMENT_LOGGING_SAMPLE_RATE: Lazy> = Lazy::new(|| { ServerVar { @@ -2839,6 +2847,7 @@ impl SystemVars { ) .with_var(&OPTIMIZER_STATS_TIMEOUT) .with_var(&OPTIMIZER_ONESHOT_STATS_TIMEOUT) + .with_var(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE) .with_var(&WEBHOOK_CONCURRENT_REQUEST_LIMIT) .with_var(&ENABLE_COLUMNATION_LGALLOC) .with_var(&TIMESTAMP_ORACLE_IMPL); @@ -3586,6 +3595,11 @@ impl SystemVars { *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT) } + /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter. + pub fn privatelink_status_update_quota_per_minute(&self) -> u32 { + *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE) + } + /// Returns the `statement_logging_max_sample_rate` configuration parameter. pub fn statement_logging_max_sample_rate(&self) -> Numeric { *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)