diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index b747c6feec94..5354c84038f3 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -489,13 +489,13 @@ impl CatalogState { id: ComputeInstanceId, name: String, introspection: Option, - introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>, + introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>, ) { let logging = match introspection { None => None, Some(introspection) => { let mut active_logs = BTreeMap::new(); - for (log, index_id) in introspection_sources { + for (log, index_id) in introspection_source_indexes { let source_name = FullObjectName { database: RawDatabaseSpecifier::Ambient, schema: log.schema.into(), @@ -3025,7 +3025,7 @@ impl Catalog { name: String, config: Option, // These are the legacy, active logs of this compute instance - introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>, + introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>, }, CreateComputeInstanceReplica { id: ReplicaId, @@ -3165,14 +3165,15 @@ impl Catalog { Op::CreateComputeInstance { name, config, - introspection_sources, + introspection_source_indexes, } => { if is_reserved_name(&name) { return Err(AdapterError::Catalog(Error::new( ErrorKind::ReservedClusterName(name), ))); } - let id = tx.insert_compute_instance(&name, &config, &introspection_sources)?; + let id = + tx.insert_compute_instance(&name, &config, &introspection_source_indexes)?; self.add_to_audit_log( session, &mut tx, @@ -3185,7 +3186,7 @@ impl Catalog { id, name, config, - introspection_sources, + introspection_source_indexes, }] } Op::CreateComputeInstanceReplica { @@ -3595,13 +3596,21 @@ impl Catalog { id, name, config, - introspection_sources, + introspection_source_indexes, } => { info!("create cluster {}", name); let introspection_source_index_ids: Vec = - introspection_sources.iter().map(|(_, id)| *id).collect(); + introspection_source_indexes + .iter() + .map(|(_, id)| *id) + .collect(); state - .insert_compute_instance(id, name.clone(), config, introspection_sources) + .insert_compute_instance( + id, + name.clone(), + config, + introspection_source_indexes, + ) .await; builtin_table_updates.push(state.pack_compute_instance_update(&name, 1)); for id in introspection_source_index_ids { @@ -4030,7 +4039,7 @@ pub enum Op { CreateComputeInstance { name: String, config: Option, - introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>, + introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>, }, CreateComputeInstanceReplica { name: String, diff --git a/src/adapter/src/catalog/storage.rs b/src/adapter/src/catalog/storage.rs index 4e515059f05a..f619a96fe42b 100644 --- a/src/adapter/src/catalog/storage.rs +++ b/src/adapter/src/catalog/storage.rs @@ -918,7 +918,7 @@ impl<'a, S: Append> Transaction<'a, S> { &mut self, cluster_name: &str, config: &Option, - introspection_sources: &Vec<(&'static BuiltinLog, GlobalId)>, + introspection_source_indexes: &Vec<(&'static BuiltinLog, GlobalId)>, ) -> Result { let id = self.get_and_increment_id(COMPUTE_ID_ALLOC_KEY.to_string())?; let config = serde_json::to_string(config) @@ -935,7 +935,7 @@ impl<'a, S: Append> Transaction<'a, S> { ))); }; - for (builtin, index_id) in introspection_sources { + for (builtin, index_id) in introspection_source_indexes { let index_id = if let GlobalId::System(id) = index_id { *id } else { diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index f7b7bc57c39d..b640fac910d8 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -77,6 +77,7 @@ use chrono::{DateTime, Utc}; use derivative::Derivative; use futures::StreamExt; use itertools::Itertools; +use mz_ore::tracing::OpenTelemetryContext; use rand::seq::SliceRandom; use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; use tokio::runtime::Handle as TokioHandle; @@ -183,6 +184,7 @@ pub struct CreateSourceStatementReady { pub params: Params, pub depends_on: Vec, pub original_stmt: Statement, + pub otel_ctx: OpenTelemetryContext, } #[derive(Derivative)] diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index ce976bdf189a..d9181b434109 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -12,6 +12,7 @@ use std::sync::Arc; +use mz_ore::tracing::OpenTelemetryContext; use rand::Rng; use tokio::sync::{oneshot, watch}; use tracing::Instrument; @@ -419,6 +420,7 @@ impl Coordinator { stmt, self.connection_context.clone(), ); + let otel_ctx = OpenTelemetryContext::obtain(); task::spawn(|| format!("purify:{conn_id}"), async move { let result = purify_fut.await.map_err(|e| e.into()); internal_cmd_tx @@ -430,6 +432,7 @@ impl Coordinator { params, depends_on, original_stmt, + otel_ctx, }, )) .expect("sending to internal_cmd_tx cannot fail"); diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 041efe53e8ab..1110616218f3 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -143,8 +143,11 @@ impl Coordinator { params, depends_on, original_stmt, + otel_ctx, }: CreateSourceStatementReady, ) { + otel_ctx.attach_as_parent(); + let stmt = match result { Ok(stmt) => stmt, Err(e) => return tx.send(Err(e), session), diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index ab5b783af246..0362586901d1 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -579,15 +579,19 @@ impl Coordinator { }: CreateComputeInstancePlan, ) -> Result { tracing::debug!("sequence_create_compute_instance"); - let introspection_sources = if compute_instance_config.is_some() { + let introspection_source_indexes = if compute_instance_config.is_some() { self.catalog.allocate_introspection_source_indexes().await } else { Vec::new() }; + let introspection_source_index_ids: Vec<_> = introspection_source_indexes + .iter() + .map(|(_, id)| *id) + .collect(); let mut ops = vec![catalog::Op::CreateComputeInstance { name: name.clone(), config: compute_instance_config.clone(), - introspection_sources, + introspection_source_indexes, }]; let azs = self.catalog.state().availability_zones(); @@ -690,6 +694,14 @@ impl Coordinator { .unwrap(); } + if !introspection_source_index_ids.is_empty() { + self.initialize_compute_read_policies( + introspection_source_index_ids, + instance.id, + DEFAULT_LOGICAL_COMPACTION_WINDOW_MS, + ) + .await; + } if !introspection_collection_ids.is_empty() { self.initialize_storage_read_policies( introspection_collection_ids, diff --git a/src/persist/src/postgres.rs b/src/persist/src/postgres.rs index 27a90f408833..d527844f58fe 100644 --- a/src/persist/src/postgres.rs +++ b/src/persist/src/postgres.rs @@ -175,6 +175,16 @@ impl PostgresConsensus { // The lock ID was randomly generated. tx.batch_execute("SELECT pg_advisory_xact_lock(135664303235462630);") .await?; + } else { + // The `consensus` table creates and deletes rows at a high frequency, generating many + // tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and + // these tombstones accumulate, scanning over the table will take increasingly and + // prohibitively long. + // + // See: https://github.com/MaterializeInc/materialize/issues/13975 + // See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables + tx.batch_execute("ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600;") + .await?; } tx.batch_execute(SCHEMA).await?; tx.commit().await?; diff --git a/test/sqllogictest/cluster_log_compaction.slt b/test/sqllogictest/cluster_log_compaction.slt new file mode 100644 index 000000000000..4f73d49f6e30 --- /dev/null +++ b/test/sqllogictest/cluster_log_compaction.slt @@ -0,0 +1,29 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Ensure correct log creation on `CREATE CLUSTER` initializes read policies for introspection source indexes. + +mode cockroach + +statement ok +CREATE CLUSTER c1 REPLICAS (r (SIZE '1')); + +statement ok +SET CLUSTER TO c1; + +statement ok +BEGIN + +# Transaction will force a read hold on this index. +query T rowsort +SELECT * FROM mz_arrangement_batches_internal_2; +---- + +statement ok +COMMIT