Skip to content

Commit

Permalink
Merge pull request MaterializeInc#23284 from rjobanp/privatelink-stat…
Browse files Browse the repository at this point in the history
…us-history

Implement PrivateLink Connection Status History Table
  • Loading branch information
rjobanp authored Dec 5, 2023
2 parents fd374ee + daa77c2 commit 18d3e23
Show file tree
Hide file tree
Showing 25 changed files with 585 additions and 15 deletions.
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,20 @@ messages and additional metadata helpful for debugging.
| `error` | [`text`] | If the source is in an error state, the error message. |
| `details` | [`jsonb`] | Additional metadata provided by the source. In case of error, may contain a `hint` field with helpful suggestions. |


### `mz_aws_privatelink_connection_status_history`

The `mz_aws_privatelink_connection_status_history` table contains a row describing
the historical status for each AWS PrivateLink connection in the system.

<!-- RELATION_SPEC mz_internal.mz_aws_privatelink_connection_status_history -->
| Field | Type | Meaning |
|-------------------|----------------------------|------------------------------------------------------------|
| `occurred_at` | `timestamp with time zone` | Wall-clock timestamp of the status change. |
| `connection_id` | `text` | The unique identifier of the AWS PrivateLink connection. Corresponds to [`mz_catalog.mz_connections.id`](../mz_catalog#mz_connections). |
| `status` | `text` | The status of the connection: one of `pending-service-discovery`, `creating-endpoint`, `recreating-endpoint`, `updating-endpoint`, `available`, `deleted`, `deleting`, `expired`, `failed`, `pending`, `pending-acceptance`, `rejected`, or `unknown`. |


<!--
### `mz_statement_execution_history`
Expand Down
25 changes: 25 additions & 0 deletions misc/cargo-vet/audits.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:9623aaad1020b799e43c6e59d93624d6c8449699"

[[audits.futures-timer]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "3.0.2"

[[audits.governor]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.6.0"

[[audits.nonzero_ext]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.3.0"

[[audits.quanta]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.11.1"

[[audits.raw-cpuid]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "10.7.0"

[[audits.timely]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
Expand Down
17 changes: 17 additions & 0 deletions misc/cargo-vet/imports.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,17 @@ criteria = "safe-to-deploy"
version = "1.0.1"
notes = "No unsafe usage or ambient capabilities"

[[audits.embark-studios.audits.no-std-compat]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
version = "0.2.0"
notes = "No unsafe usage or ambient capabilities"

[[audits.embark-studios.audits.no-std-compat]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
delta = "0.2.0 -> 0.4.1"

[[audits.embark-studios.audits.similar]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
Expand Down Expand Up @@ -1442,6 +1453,12 @@ criteria = "safe-to-deploy"
version = "0.4.17"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"

[[audits.mozilla.audits.mach2]]
who = "Gabriele Svelto <gsvelto@mozilla.com>"
criteria = "safe-to-deploy"
version = "0.4.1"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"

[[audits.mozilla.audits.matches]]
who = "Bobby Holley <bobbyholley@gmail.com>"
criteria = "safe-to-deploy"
Expand Down
1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ differential-dataflow = "0.12.0"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
itertools = "0.10.5"
once_cell = "1.16.0"
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use mz_adapter_types::compaction::DEFAULT_LOGICAL_COMPACTION_WINDOW_TS;
use mz_adapter_types::connection::ConnectionId;
use mz_build_info::BuildInfo;
use mz_catalog::memory::objects::{CatalogEntry, CatalogItem, Connection, DataSourceDesc, Source};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
use mz_compute_types::ComputeInstanceId;
Expand Down Expand Up @@ -172,6 +172,7 @@ mod ddl;
mod indexes;
mod introspection;
mod message_handler;
mod privatelink_status;
mod read_policy;
mod sequencer;
mod sql;
Expand Down Expand Up @@ -227,6 +228,7 @@ pub enum Message<T = mz_repr::Timestamp> {
stage: PeekStage,
},
DrainStatementLog,
PrivateLinkVpcEndpointEvents(BTreeMap<GlobalId, VpcEndpointEvent>),
}

impl Message {
Expand Down Expand Up @@ -267,6 +269,7 @@ impl Message {
Message::PeekStageReady { .. } => "peek_stage_ready",
Message::DrainStatementLog => "drain_statement_log",
Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
}
}
}
Expand Down Expand Up @@ -2027,6 +2030,7 @@ impl Coordinator {
});

self.schedule_storage_usage_collection().await;
self.spawn_privatelink_vpc_endpoints_watch_task();
self.spawn_statement_logging_task();
flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);

Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ impl Coordinator {
Message::DrainStatementLog => {
self.drain_statement_log().await;
}
Message::PrivateLinkVpcEndpointEvents(events) => {
self.write_privatelink_status_updates(events).await;
}
}
}
.boxed_local()
Expand Down
108 changes: 108 additions & 0 deletions src/adapter/src/coord/privatelink_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroU32;
use std::{collections::BTreeMap, sync::Arc};

use governor::{Quota, RateLimiter};

use mz_cloud_resources::VpcEndpointEvent;
use mz_ore::future::OreStreamExt;
use mz_ore::task::spawn;
use mz_repr::{Datum, GlobalId, Row};
use mz_storage_client::controller::IntrospectionType;

use crate::coord::Coordinator;

use super::Message;

impl Coordinator {
pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let rate_quota: u32 = self
.catalog
.system_config()
.privatelink_status_update_quota_per_minute();

if let Some(controller) = &self.cloud_resource_controller {
// Retrieve the timestamp of the last event written to the status table for each id
// to avoid writing duplicate rows
let mut last_written_events = self
.controller
.storage
.get_privatelink_status_table_latest()
.clone()
.unwrap_or_else(BTreeMap::new);

let controller = Arc::clone(controller);
spawn(|| "privatelink_vpc_endpoint_watch", async move {
let mut stream = controller.watch_vpc_endpoints().await;
// Using a per-minute quota implies a burst-size of the same amount
let rate_limiter = RateLimiter::direct(Quota::per_minute(
NonZeroU32::new(rate_quota).expect("will be non-zero"),
));

loop {
// Wait for events to become available
if let Some(new_events) = stream.recv_many(20).await {
// Wait until we're permitted to tell the coordinator about the events
// Note that the stream is backed by a https://docs.rs/kube/latest/kube/runtime/fn.watcher.html,
// which means its safe for us to rate limit for an arbitrarily long time and expect the stream
// to continue to work, despite not being polled
rate_limiter.until_ready().await;

// Events to be written, de-duped by connection_id
let mut event_map = BTreeMap::new();

for event in new_events {
match last_written_events.get(&event.connection_id) {
// Ignore if an event with this time was already written
Some(time) if time >= &event.time => {}
_ => {
last_written_events
.insert(event.connection_id.clone(), event.time.clone());
event_map.insert(event.connection_id, event);
}
}
}

// Send the event batch to the coordinator to be written
let _ =
internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(event_map));
}
}
});
}
}

pub(crate) async fn write_privatelink_status_updates(
&mut self,
events: BTreeMap<GlobalId, VpcEndpointEvent>,
) {
let mut updates = Vec::new();
for value in events.into_values() {
updates.push((
Row::pack_slice(&[
Datum::TimestampTz(value.time.try_into().expect("must fit")),
Datum::String(&value.connection_id.to_string()),
Datum::String(&value.status.to_string()),
]),
1,
));
}

self.controller
.storage
.record_introspection_updates(
IntrospectionType::PrivatelinkConnectionStatusHistory,
updates,
)
.await;
}
}
4 changes: 3 additions & 1 deletion src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters {
},
pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(),
keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(),
keep_n_sink_status_history_entries: config.keep_n_source_status_history_entries(),
keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(),
keep_n_privatelink_status_history_entries: config
.keep_n_privatelink_status_history_entries(),
upsert_rocksdb_tuning_config: {
match mz_rocksdb_types::RocksDBTuningParameters::from_parameters(
config.upsert_rocksdb_compaction_style(),
Expand Down
Loading

0 comments on commit 18d3e23

Please sign in to comment.