Skip to content

Commit

Permalink
mz_aws_privatelink_connection_status_history table as a storage-manag…
Browse files Browse the repository at this point in the history
…ed collection with truncation on startup
  • Loading branch information
rjobanp committed Dec 4, 2023
1 parent 2d915f7 commit fabfcbc
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters {
},
pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(),
keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(),
keep_n_sink_status_history_entries: config.keep_n_source_status_history_entries(),
keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(),
keep_n_privatelink_status_history_entries: config
.keep_n_privatelink_status_history_entries(),
upsert_rocksdb_tuning_config: {
match mz_rocksdb_types::RocksDBTuningParameters::from_parameters(
config.upsert_rocksdb_compaction_style(),
Expand Down
16 changes: 14 additions & 2 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ use mz_sql::session::user::{
};
use mz_storage_client::controller::IntrospectionType;
use mz_storage_client::healthcheck::{
MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
MZ_SOURCE_STATUS_HISTORY_DESC, MZ_STATEMENT_EXECUTION_HISTORY_DESC,
MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC,
MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
MZ_STATEMENT_EXECUTION_HISTORY_DESC,
};
use once_cell::sync::Lazy;
use serde::Serialize;
Expand Down Expand Up @@ -2412,6 +2413,16 @@ pub static MZ_SOURCE_STATUS_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| BuiltinS
sensitivity: DataSensitivity::Public,
});

pub static MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY: Lazy<BuiltinSource> =
Lazy::new(|| BuiltinSource {
name: "mz_aws_privatelink_connection_status_history",
schema: MZ_INTERNAL_SCHEMA,
data_source: Some(IntrospectionType::PrivatelinkConnectionStatusHistory),
desc: MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
sensitivity: DataSensitivity::Public,
});

pub static MZ_STATEMENT_EXECUTION_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_statement_execution_history",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -6211,6 +6222,7 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Source(&MZ_SINK_STATUS_HISTORY),
Builtin::View(&MZ_SINK_STATUSES),
Builtin::Source(&MZ_SOURCE_STATUS_HISTORY),
Builtin::Source(&MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY),
Builtin::Source(&MZ_STATEMENT_EXECUTION_HISTORY),
Builtin::View(&MZ_STATEMENT_EXECUTION_HISTORY_REDACTED),
Builtin::Source(&MZ_PREPARED_STATEMENT_HISTORY),
Expand Down
14 changes: 14 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,15 @@ const KEEP_N_SINK_STATUS_HISTORY_ENTRIES: ServerVar<usize> = ServerVar {
internal: true
};

/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_privatelink_status_history_entries`].
const KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: ServerVar<usize> = ServerVar {
name: UncasedStr::new("keep_n_privatelink_status_history_entries"),
value: &5,
description: "On reboot, truncate all but the last n entries per ID in the mz_aws_privatelink_connection_status_history \
collection (Materialize).",
internal: true
};

const ENABLE_STORAGE_SHARD_FINALIZATION: ServerVar<bool> = ServerVar {
name: UncasedStr::new("enable_storage_shard_finalization"),
value: &true,
Expand Down Expand Up @@ -2792,6 +2801,7 @@ impl SystemVars {
.with_var(&MAX_CONNECTIONS)
.with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
.with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
.with_var(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
.with_var(&ENABLE_MZ_JOIN_CORE)
.with_var(&LINEAR_JOIN_YIELDING)
.with_var(&DEFAULT_IDLE_ARRANGEMENT_MERGE_EFFORT)
Expand Down Expand Up @@ -3461,6 +3471,10 @@ impl SystemVars {
*self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
}

pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
*self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
}

/// Returns the `enable_mz_join_core` configuration parameter.
pub fn enable_mz_join_core(&self) -> bool {
*self.expect_value(&ENABLE_MZ_JOIN_CORE)
Expand Down
3 changes: 3 additions & 0 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub enum IntrospectionType {
// Collections written by the compute controller.
ComputeDependencies,
ComputeReplicaHeartbeats,

// Written by the Adapter for tracking AWS PrivateLink Connection Status History
PrivatelinkConnectionStatusHistory,
}

/// Describes how data is written to the collection.
Expand Down
10 changes: 10 additions & 0 deletions src/storage-client/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,13 @@ pub static MZ_SINK_STATUS_HISTORY_DESC: Lazy<RelationDesc> = Lazy::new(|| {
.with_column("error", ScalarType::String.nullable(true))
.with_column("details", ScalarType::Jsonb.nullable(true))
});

pub static MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC: Lazy<RelationDesc> = Lazy::new(|| {
RelationDesc::empty()
.with_column(
"occurred_at",
ScalarType::TimestampTz { precision: None }.nullable(false),
)
.with_column("connection_id", ScalarType::String.nullable(false))
.with_column("status", ScalarType::String.nullable(false))
});
17 changes: 17 additions & 0 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,12 @@ where
)
.await;
}
IntrospectionType::PrivatelinkConnectionStatusHistory => {
self.partially_truncate_status_history(
IntrospectionType::PrivatelinkConnectionStatusHistory,
)
.await;
}

// Truncate compute-maintained collections.
IntrospectionType::ComputeDependencies
Expand Down Expand Up @@ -2634,6 +2640,17 @@ where
.expect("schema has not changed")
.0,
),
IntrospectionType::PrivatelinkConnectionStatusHistory => (
self.config.keep_n_privatelink_status_history_entries,
healthcheck::MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC
.get_by_name(&ColumnName::from("occurred_at"))
.expect("schema has not changed")
.0,
healthcheck::MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC
.get_by_name(&ColumnName::from("connection_id"))
.expect("schema has not changed")
.0,
),
_ => unreachable!(),
};

Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/parameters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message ProtoStorageParameters {
ProtoPgSourceTcpTimeouts pg_source_tcp_timeouts = 16;
mz_proto.ProtoDuration pg_source_snapshot_statement_timeout = 17;
bool record_namespaced_errors = 18;
uint64 keep_n_privatelink_status_history_entries = 19;
}


Expand Down
10 changes: 10 additions & 0 deletions src/storage-types/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct StorageParameters {
pub pg_source_snapshot_statement_timeout: Duration,
pub keep_n_source_status_history_entries: usize,
pub keep_n_sink_status_history_entries: usize,
pub keep_n_privatelink_status_history_entries: usize,
/// A set of parameters used to tune RocksDB when used with `UPSERT` sources.
pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters,
/// Whether or not to allow shard finalization to occur. Note that this will
Expand Down Expand Up @@ -65,6 +66,7 @@ impl Default for StorageParameters {
pg_source_snapshot_statement_timeout: Default::default(),
keep_n_source_status_history_entries: Default::default(),
keep_n_sink_status_history_entries: Default::default(),
keep_n_privatelink_status_history_entries: Default::default(),
upsert_rocksdb_tuning_config: Default::default(),
finalize_shards: Default::default(),
tracing: Default::default(),
Expand Down Expand Up @@ -152,6 +154,7 @@ impl StorageParameters {
pg_source_snapshot_statement_timeout,
keep_n_source_status_history_entries,
keep_n_sink_status_history_entries,
keep_n_privatelink_status_history_entries,
upsert_rocksdb_tuning_config,
finalize_shards,
tracing,
Expand All @@ -168,6 +171,7 @@ impl StorageParameters {
self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout;
self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries;
self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries;
self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
self.finalize_shards = finalize_shards;
self.tracing.update(tracing);
Expand Down Expand Up @@ -196,6 +200,9 @@ impl RustType<ProtoStorageParameters> for StorageParameters {
keep_n_sink_status_history_entries: u64::cast_from(
self.keep_n_sink_status_history_entries,
),
keep_n_privatelink_status_history_entries: u64::cast_from(
self.keep_n_privatelink_status_history_entries,
),
upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()),
finalize_shards: self.finalize_shards,
tracing: Some(self.tracing.into_proto()),
Expand Down Expand Up @@ -231,6 +238,9 @@ impl RustType<ProtoStorageParameters> for StorageParameters {
keep_n_sink_status_history_entries: usize::cast_from(
proto.keep_n_sink_status_history_entries,
),
keep_n_privatelink_status_history_entries: usize::cast_from(
proto.keep_n_privatelink_status_history_entries,
),
upsert_rocksdb_tuning_config: proto
.upsert_rocksdb_tuning_config
.into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,
Expand Down

0 comments on commit fabfcbc

Please sign in to comment.