Skip to content

Commit

Permalink
Run a k8s watch on VPC Endpoints in coordinator-spawned task, write t…
Browse files Browse the repository at this point in the history
…o system table
  • Loading branch information
rjobanp committed Dec 4, 2023
1 parent fabfcbc commit cc2e6e4
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 10 deletions.
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ differential-dataflow = "0.12.0"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
itertools = "0.10.5"
once_cell = "1.16.0"
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use mz_adapter_types::compaction::DEFAULT_LOGICAL_COMPACTION_WINDOW_TS;
use mz_adapter_types::connection::ConnectionId;
use mz_build_info::BuildInfo;
use mz_catalog::memory::objects::{CatalogEntry, CatalogItem, Connection, DataSourceDesc, Source};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
use mz_compute_types::ComputeInstanceId;
Expand Down Expand Up @@ -172,6 +172,7 @@ mod ddl;
mod indexes;
mod introspection;
mod message_handler;
mod privatelink_status;
mod read_policy;
mod sequencer;
mod sql;
Expand Down Expand Up @@ -234,6 +235,7 @@ pub enum Message<T = mz_repr::Timestamp> {
stage: PeekStage,
},
DrainStatementLog,
PrivateLinkVpcEndpointEvents(BTreeMap<GlobalId, VpcEndpointEvent>),
}

impl Message {
Expand Down Expand Up @@ -274,6 +276,7 @@ impl Message {
Message::PeekStageReady { .. } => "peek_stage_ready",
Message::DrainStatementLog => "drain_statement_log",
Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
}
}
}
Expand Down Expand Up @@ -2014,6 +2017,7 @@ impl Coordinator {
});

self.schedule_storage_usage_collection().await;
self.spawn_privatelink_vpc_endpoints_watch_task();
self.spawn_statement_logging_task();
flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);

Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ impl Coordinator {
Message::DrainStatementLog => {
self.drain_statement_log().await;
}
Message::PrivateLinkVpcEndpointEvents(events) => {
self.write_privatelink_status_updates(events).await;
}
}
}
.boxed_local()
Expand Down
90 changes: 90 additions & 0 deletions src/adapter/src/coord/privatelink_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroU32;
use std::{collections::BTreeMap, sync::Arc};

use futures::stream::StreamExt;
use futures::FutureExt;
use governor::{Quota, RateLimiter};

use mz_cloud_resources::VpcEndpointEvent;
use mz_ore::task::spawn;
use mz_repr::{Datum, GlobalId, Row};
use mz_storage_client::controller::IntrospectionType;

use crate::coord::Coordinator;

use super::Message;

impl Coordinator {
pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let rate_quota: i32 = self
.catalog
.system_config()
.privatelink_status_update_quota_per_minute()
.try_into()
.expect("value coouldn't be converted to i32");
if let Some(controller) = &self.cloud_resource_controller {
let controller = Arc::clone(controller);
spawn(|| "privatelink_vpc_endpoint_watch", async move {
let mut stream = controller.watch_vpc_endpoints().await;
// Using a per-minute quota implies a burst-size of the same amount
let rate_limiter = RateLimiter::direct(Quota::per_minute(
NonZeroU32::new(rate_quota).expect("will be non-zero"),
));

loop {
// Wait for an event to become available
if let Some(event) = stream.next().await {
// Wait until we're permitted to tell the coordinator about the event
rate_limiter.until_ready().await;

// Drain any additional events from the queue that accumulated while
// waiting for the rate limiter, deduplicating by connection_id.
let mut events = BTreeMap::new();
events.insert(event.connection_id, event);
while let Some(event) = stream.next().now_or_never().and_then(|e| e) {
events.insert(event.connection_id, event);
}

// Send the event batch to the coordinator.
let _ = internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(events));
}
}
});
}
}

pub(crate) async fn write_privatelink_status_updates(
&mut self,
events: BTreeMap<GlobalId, VpcEndpointEvent>,
) {
let mut updates = Vec::new();
for value in events.into_values() {
updates.push((
Row::pack_slice(&[
Datum::TimestampTz(value.time.try_into().expect("must fit")),
Datum::String(&value.connection_id.to_string()),
Datum::String(&value.status.to_string()),
]),
1,
));
}

self.controller
.storage
.record_introspection_updates(
IntrospectionType::PrivatelinkConnectionStatusHistory,
updates,
)
.await;
}
}
2 changes: 2 additions & 0 deletions src/cloud-resources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ anyhow = "1.0.66"
async-trait = "0.1.68"
k8s-openapi = { version = "0.20.0", features = ["schemars", "v1_26"] }
kube = { version = "0.87.1", default-features = false, features = ["client", "derive", "openssl-tls", "ws"] }
chrono = { version = "0.4.23", default-features = false }
futures = "0.3.25"
mz-ore = { path = "../ore", features = [] }
mz-repr = { path = "../repr" }
schemars = { version = "0.8", features = ["uuid1"] }
Expand Down
22 changes: 22 additions & 0 deletions src/cloud-resources/src/crd/vpc_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

//! VpcEndpoint custom resource, to be reconciled into an AWS VPC Endpoint by the
//! environment-controller.
use std::fmt;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use kube::CustomResource;
Expand Down Expand Up @@ -78,6 +79,27 @@ pub mod v1 {
Rejected,
Unknown,
}

impl fmt::Display for VpcEndpointState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let repr = match self {
VpcEndpointState::PendingServiceDiscovery => "pending-service-discovery",
VpcEndpointState::CreatingEndpoint => "creating-endpoint",
VpcEndpointState::RecreatingEndpoint => "recreating-endpoint",
VpcEndpointState::UpdatingEndpoint => "updating-endpoint",
VpcEndpointState::Available => "available",
VpcEndpointState::Deleted => "deleted",
VpcEndpointState::Deleting => "deleting",
VpcEndpointState::Expired => "expired",
VpcEndpointState::Failed => "failed",
VpcEndpointState::Pending => "pending",
VpcEndpointState::PendingAcceptance => "pending-acceptance",
VpcEndpointState::Rejected => "rejected",
VpcEndpointState::Unknown => "unknown",
};
write!(f, "{}", repr)
}
}
}

#[cfg(test)]
Expand Down
22 changes: 21 additions & 1 deletion src/cloud-resources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@

use std::collections::BTreeMap;
use std::fmt::{self, Debug};
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use mz_repr::GlobalId;
use serde::{Deserialize, Serialize};

use crate::crd::vpc_endpoint::v1::VpcEndpointStatus;
use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};

pub mod crd;

Expand Down Expand Up @@ -153,6 +156,9 @@ pub trait CloudResourceController: CloudResourceReader {
&self,
) -> Result<BTreeMap<GlobalId, VpcEndpointStatus>, anyhow::Error>;

/// Lists existing `VpcEndpoint` Kubernetes objects.
async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;

/// Returns a reader for the resources managed by this controller.
fn reader(&self) -> Arc<dyn CloudResourceReader>;
}
Expand All @@ -170,6 +176,13 @@ pub fn vpc_endpoint_name(id: GlobalId) -> String {
format!("connection-{id}")
}

/// Attempts to return the ID used to create the given VPC endpoint name.
pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<GlobalId> {
vpc_endpoint_name
.split_once('-')
.and_then(|(_, id_str)| GlobalId::from_str(id_str).ok())
}

/// Returns the host to use for the VPC endpoint with the given ID and
/// optionally in the given availability zone.
pub fn vpc_endpoint_host(id: GlobalId, availability_zone: Option<&str>) -> String {
Expand All @@ -181,3 +194,10 @@ pub fn vpc_endpoint_host(id: GlobalId, availability_zone: Option<&str>) -> Strin
None => name,
}
}

#[derive(Debug)]
pub struct VpcEndpointEvent {
pub connection_id: GlobalId,
pub status: VpcEndpointState,
pub time: DateTime<Utc>,
}
Loading

0 comments on commit cc2e6e4

Please sign in to comment.