From fabfcbc635770ba49adb388813634d576bbedba3 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Mon, 13 Nov 2023 13:00:51 -0500 Subject: [PATCH] mz_aws_privatelink_connection_status_history table as a storage-managed collection with truncation on startup --- src/adapter/src/flags.rs | 4 +++- src/catalog/src/builtin.rs | 16 ++++++++++++++-- src/sql/src/session/vars.rs | 14 ++++++++++++++ src/storage-client/src/controller.rs | 3 +++ src/storage-client/src/healthcheck.rs | 10 ++++++++++ src/storage-controller/src/lib.rs | 17 +++++++++++++++++ src/storage-types/src/parameters.proto | 1 + src/storage-types/src/parameters.rs | 10 ++++++++++ 8 files changed, 72 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 000bb6e49734..dac682a9e67a 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -84,7 +84,9 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters { }, pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(), keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(), - keep_n_sink_status_history_entries: config.keep_n_source_status_history_entries(), + keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(), + keep_n_privatelink_status_history_entries: config + .keep_n_privatelink_status_history_entries(), upsert_rocksdb_tuning_config: { match mz_rocksdb_types::RocksDBTuningParameters::from_parameters( config.upsert_rocksdb_compaction_style(), diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 91d539b4dafe..93fefc862b5a 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -44,8 +44,9 @@ use mz_sql::session::user::{ }; use mz_storage_client::controller::IntrospectionType; use mz_storage_client::healthcheck::{ - MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, - MZ_SOURCE_STATUS_HISTORY_DESC, MZ_STATEMENT_EXECUTION_HISTORY_DESC, + MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, + MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, + MZ_STATEMENT_EXECUTION_HISTORY_DESC, }; use once_cell::sync::Lazy; use serde::Serialize; @@ -2412,6 +2413,16 @@ pub static MZ_SOURCE_STATUS_HISTORY: Lazy = Lazy::new(|| BuiltinS sensitivity: DataSensitivity::Public, }); +pub static MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY: Lazy = + Lazy::new(|| BuiltinSource { + name: "mz_aws_privatelink_connection_status_history", + schema: MZ_INTERNAL_SCHEMA, + data_source: Some(IntrospectionType::PrivatelinkConnectionStatusHistory), + desc: MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC.clone(), + is_retained_metrics_object: false, + sensitivity: DataSensitivity::Public, + }); + pub static MZ_STATEMENT_EXECUTION_HISTORY: Lazy = Lazy::new(|| BuiltinSource { name: "mz_statement_execution_history", schema: MZ_INTERNAL_SCHEMA, @@ -6211,6 +6222,7 @@ pub static BUILTINS_STATIC: Lazy>> = Lazy::new(|| { Builtin::Source(&MZ_SINK_STATUS_HISTORY), Builtin::View(&MZ_SINK_STATUSES), Builtin::Source(&MZ_SOURCE_STATUS_HISTORY), + Builtin::Source(&MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY), Builtin::Source(&MZ_STATEMENT_EXECUTION_HISTORY), Builtin::View(&MZ_STATEMENT_EXECUTION_HISTORY_REDACTED), Builtin::Source(&MZ_PREPARED_STATEMENT_HISTORY), diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 5fa0b8f9e121..d27f80d5ca32 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1433,6 +1433,15 @@ const KEEP_N_SINK_STATUS_HISTORY_ENTRIES: ServerVar = ServerVar { internal: true }; +/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`]. +const KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: ServerVar = ServerVar { + name: UncasedStr::new("keep_n_privatelink_status_history_entries"), + value: &5, + description: "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \ + collection (Materialize).", + internal: true +}; + const ENABLE_STORAGE_SHARD_FINALIZATION: ServerVar = ServerVar { name: UncasedStr::new("enable_storage_shard_finalization"), value: &true, @@ -2792,6 +2801,7 @@ impl SystemVars { .with_var(&MAX_CONNECTIONS) .with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES) .with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES) + .with_var(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES) .with_var(&ENABLE_MZ_JOIN_CORE) .with_var(&LINEAR_JOIN_YIELDING) .with_var(&DEFAULT_IDLE_ARRANGEMENT_MERGE_EFFORT) @@ -3461,6 +3471,10 @@ impl SystemVars { *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES) } + pub fn keep_n_privatelink_status_history_entries(&self) -> usize { + *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES) + } + /// Returns the `enable_mz_join_core` configuration parameter. pub fn enable_mz_join_core(&self) -> bool { *self.expect_value(&ENABLE_MZ_JOIN_CORE) diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 3ecda5a415c9..f37834214060 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -69,6 +69,9 @@ pub enum IntrospectionType { // Collections written by the compute controller. ComputeDependencies, ComputeReplicaHeartbeats, + + // Written by the Adapter for tracking AWS PrivateLink Connection Status History + PrivatelinkConnectionStatusHistory, } /// Describes how data is written to the collection. diff --git a/src/storage-client/src/healthcheck.rs b/src/storage-client/src/healthcheck.rs index a8a58b22fe11..69a80a1f6d29 100644 --- a/src/storage-client/src/healthcheck.rs +++ b/src/storage-client/src/healthcheck.rs @@ -104,3 +104,13 @@ pub static MZ_SINK_STATUS_HISTORY_DESC: Lazy = Lazy::new(|| { .with_column("error", ScalarType::String.nullable(true)) .with_column("details", ScalarType::Jsonb.nullable(true)) }); + +pub static MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC: Lazy = Lazy::new(|| { + RelationDesc::empty() + .with_column( + "occurred_at", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column("connection_id", ScalarType::String.nullable(false)) + .with_column("status", ScalarType::String.nullable(false)) +}); diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 48f5afa000ab..7dcbf634b71c 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -917,6 +917,12 @@ where ) .await; } + IntrospectionType::PrivatelinkConnectionStatusHistory => { + self.partially_truncate_status_history( + IntrospectionType::PrivatelinkConnectionStatusHistory, + ) + .await; + } // Truncate compute-maintained collections. IntrospectionType::ComputeDependencies @@ -2634,6 +2640,17 @@ where .expect("schema has not changed") .0, ), + IntrospectionType::PrivatelinkConnectionStatusHistory => ( + self.config.keep_n_privatelink_status_history_entries, + healthcheck::MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC + .get_by_name(&ColumnName::from("occurred_at")) + .expect("schema has not changed") + .0, + healthcheck::MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC + .get_by_name(&ColumnName::from("connection_id")) + .expect("schema has not changed") + .0, + ), _ => unreachable!(), }; diff --git a/src/storage-types/src/parameters.proto b/src/storage-types/src/parameters.proto index e10c8aa2a702..955609f361ff 100644 --- a/src/storage-types/src/parameters.proto +++ b/src/storage-types/src/parameters.proto @@ -34,6 +34,7 @@ message ProtoStorageParameters { ProtoPgSourceTcpTimeouts pg_source_tcp_timeouts = 16; mz_proto.ProtoDuration pg_source_snapshot_statement_timeout = 17; bool record_namespaced_errors = 18; + uint64 keep_n_privatelink_status_history_entries = 19; } diff --git a/src/storage-types/src/parameters.rs b/src/storage-types/src/parameters.rs index 0c36165859f7..2eb0e63913e4 100644 --- a/src/storage-types/src/parameters.rs +++ b/src/storage-types/src/parameters.rs @@ -32,6 +32,7 @@ pub struct StorageParameters { pub pg_source_snapshot_statement_timeout: Duration, pub keep_n_source_status_history_entries: usize, pub keep_n_sink_status_history_entries: usize, + pub keep_n_privatelink_status_history_entries: usize, /// A set of parameters used to tune RocksDB when used with `UPSERT` sources. pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters, /// Whether or not to allow shard finalization to occur. Note that this will @@ -65,6 +66,7 @@ impl Default for StorageParameters { pg_source_snapshot_statement_timeout: Default::default(), keep_n_source_status_history_entries: Default::default(), keep_n_sink_status_history_entries: Default::default(), + keep_n_privatelink_status_history_entries: Default::default(), upsert_rocksdb_tuning_config: Default::default(), finalize_shards: Default::default(), tracing: Default::default(), @@ -152,6 +154,7 @@ impl StorageParameters { pg_source_snapshot_statement_timeout, keep_n_source_status_history_entries, keep_n_sink_status_history_entries, + keep_n_privatelink_status_history_entries, upsert_rocksdb_tuning_config, finalize_shards, tracing, @@ -168,6 +171,7 @@ impl StorageParameters { self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout; self.keep_n_source_status_history_entries = keep_n_source_status_history_entries; self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries; + self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries; self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config; self.finalize_shards = finalize_shards; self.tracing.update(tracing); @@ -196,6 +200,9 @@ impl RustType for StorageParameters { keep_n_sink_status_history_entries: u64::cast_from( self.keep_n_sink_status_history_entries, ), + keep_n_privatelink_status_history_entries: u64::cast_from( + self.keep_n_privatelink_status_history_entries, + ), upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()), finalize_shards: self.finalize_shards, tracing: Some(self.tracing.into_proto()), @@ -231,6 +238,9 @@ impl RustType for StorageParameters { keep_n_sink_status_history_entries: usize::cast_from( proto.keep_n_sink_status_history_entries, ), + keep_n_privatelink_status_history_entries: usize::cast_from( + proto.keep_n_privatelink_status_history_entries, + ), upsert_rocksdb_tuning_config: proto .upsert_rocksdb_tuning_config .into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,