From 431a6095bcfe31227b3203adccc294e0ebc7cb70 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 19 Sep 2023 17:27:42 -0400 Subject: [PATCH] [persist] Dynamic lease timeouts (#21822) This is a long-time known-useful knob which has recently become more urgent. --- src/adapter/src/flags.rs | 1 + src/persist-client/src/async_runtime.rs | 8 +++++ src/persist-client/src/cfg.proto | 1 + src/persist-client/src/cfg.rs | 39 ++++++++++++++++++---- src/persist-client/src/internal/machine.rs | 4 +-- src/persist-client/src/lib.rs | 7 ++-- src/persist-client/src/read.rs | 10 +++--- src/sql/src/session/vars.rs | 14 ++++++++ src/storage/src/source/reclock.rs | 6 ++-- src/storage/tests/setup.rs | 4 ++- 10 files changed, 77 insertions(+), 17 deletions(-) diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index aabd4a8a6ce7..983df3b6f1ec 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -158,6 +158,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters { multiplier: config.persist_next_listen_batch_retryer_multiplier(), clamp: config.persist_next_listen_batch_retryer_clamp(), }), + reader_lease_duration: Some(config.persist_reader_lease_duration()), stats_audit_percent: Some(config.persist_stats_audit_percent()), stats_collection_enabled: Some(config.persist_stats_collection_enabled()), stats_filter_enabled: Some(config.persist_stats_filter_enabled()), diff --git a/src/persist-client/src/async_runtime.rs b/src/persist-client/src/async_runtime.rs index 05cf31046428..ffa5cd776056 100644 --- a/src/persist-client/src/async_runtime.rs +++ b/src/persist-client/src/async_runtime.rs @@ -10,6 +10,7 @@ //! Async runtime extensions. use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; use mz_ore::task::RuntimeExt; use tokio::runtime::{Builder, Runtime}; @@ -36,6 +37,13 @@ impl IsolatedRuntime { // TODO: choose a more principled `worker_limit`. Right now we use the // Tokio default, which is presently the number of cores on the machine. let runtime = Builder::new_multi_thread() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + // This will wrap around eventually, which is not ideal, but it's important that + // it stays small to fit within OS limits. + format!("persist:{:04x}", id % 0x10000) + }) .enable_all() .build() .expect("known to be valid"); diff --git a/src/persist-client/src/cfg.proto b/src/persist-client/src/cfg.proto index 6cd5a283bea4..6afd83c80b51 100644 --- a/src/persist-client/src/cfg.proto +++ b/src/persist-client/src/cfg.proto @@ -30,6 +30,7 @@ message ProtoPersistParameters { optional uint64 blob_cache_mem_limit_bytes = 14; mz_proto.ProtoDuration consensus_connection_pool_ttl = 15; mz_proto.ProtoDuration consensus_connection_pool_ttl_stagger = 16; + mz_proto.ProtoDuration reader_lease_duration = 20; optional uint64 stats_budget_bytes = 17; optional ProtoUntrimmableColumns stats_untrimmable_columns = 18; map feature_flags = 19; diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index be081b43bcbc..7bc005baec5a 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -115,7 +115,7 @@ pub struct PersistConfig { /// A clock to use for all leasing and other non-debugging use. pub now: NowFn, /// Configurations that can be dynamically updated. - pub(crate) dynamic: Arc, + pub dynamic: Arc, /// Whether to physically and logically compact batches in blob storage. pub compaction_enabled: bool, /// In Compactor::compact_and_apply_background, the maximum number of concurrent @@ -133,9 +133,6 @@ pub struct PersistConfig { /// Length of time after a writer's last operation after which the writer /// may be expired. pub writer_lease_duration: Duration, - /// Length of time after a reader's last operation after which the reader - /// may be expired. - pub reader_lease_duration: Duration, /// Length of time between critical handles' calls to downgrade since pub critical_downgrade_interval: Duration, /// Timeout per connection attempt to Persist PubSub service. @@ -179,6 +176,7 @@ impl PersistConfig { ), consensus_connect_timeout: RwLock::new(Self::DEFAULT_CRDB_CONNECT_TIMEOUT), consensus_tcp_user_timeout: RwLock::new(Self::DEFAULT_CRDB_TCP_USER_TIMEOUT), + reader_lease_duration: RwLock::new(Self::DEFAULT_READ_LEASE_DURATION), gc_blob_delete_concurrency_limit: AtomicUsize::new(32), state_versions_recent_live_diffs_limit: AtomicUsize::new( 30 * Self::DEFAULT_ROLLUP_THRESHOLD, @@ -215,7 +213,6 @@ impl PersistConfig { compaction_yield_after_n_updates: 100_000, consensus_connection_pool_max_size: 50, writer_lease_duration: 60 * Duration::from_secs(60), - reader_lease_duration: Self::DEFAULT_READ_LEASE_DURATION, critical_downgrade_interval: Duration::from_secs(30), pubsub_connect_attempt_timeout: Duration::from_secs(5), pubsub_connect_max_backoff: Duration::from_secs(60), @@ -340,7 +337,7 @@ impl PersistConfig { // // MIGRATION: Remove this once we remove the ReaderState <-> // ProtoReaderState migration. - pub(crate) const DEFAULT_READ_LEASE_DURATION: Duration = Duration::from_secs(60 * 15); + pub const DEFAULT_READ_LEASE_DURATION: Duration = Duration::from_secs(60 * 15); // TODO: Get rid of this in favor of using PersistParameters at the // relevant callsites. @@ -417,6 +414,7 @@ pub struct DynamicConfig { consensus_tcp_user_timeout: RwLock, consensus_connection_pool_ttl: RwLock, consensus_connection_pool_ttl_stagger: RwLock, + reader_lease_duration: RwLock, sink_minimum_batch_updates: AtomicUsize, storage_sink_minimum_batch_updates: AtomicUsize, stats_audit_percent: AtomicUsize, @@ -598,6 +596,18 @@ impl DynamicConfig { .expect("lock poisoned") } + /// Length of time after a reader's last operation after which the reader + /// may be expired. + pub fn reader_lease_duration(&self) -> Duration { + *self.reader_lease_duration.read().expect("lock poisoned") + } + + /// Set the length of time after a reader's last operation after which the reader + /// may be expired. + pub fn set_reader_lease_duration(&self, d: Duration) { + *self.reader_lease_duration.write().expect("lock poisoned") = d; + } + /// The maximum number of concurrent blob deletes during garbage collection. pub fn gc_blob_delete_concurrency_limit(&self) -> usize { self.gc_blob_delete_concurrency_limit @@ -756,6 +766,8 @@ pub struct PersistParameters { pub consensus_connection_pool_ttl_stagger: Option, /// Configures [`DynamicConfig::next_listen_batch_retry_params`]. pub next_listen_batch_retryer: Option, + /// Configures [`DynamicConfig::reader_lease_duration`]. + pub reader_lease_duration: Option, /// Configures [`PersistConfig::sink_minimum_batch_updates`]. pub sink_minimum_batch_updates: Option, /// Configures [`PersistConfig::storage_sink_minimum_batch_updates`]. @@ -793,6 +805,7 @@ impl PersistParameters { consensus_tcp_user_timeout: self_consensus_tcp_user_timeout, consensus_connection_pool_ttl: self_consensus_connection_pool_ttl, consensus_connection_pool_ttl_stagger: self_consensus_connection_pool_ttl_stagger, + reader_lease_duration: self_reader_lease_duration, sink_minimum_batch_updates: self_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates, next_listen_batch_retryer: self_next_listen_batch_retryer, @@ -814,6 +827,7 @@ impl PersistParameters { consensus_tcp_user_timeout: other_consensus_tcp_user_timeout, consensus_connection_pool_ttl: other_consensus_connection_pool_ttl, consensus_connection_pool_ttl_stagger: other_consensus_connection_pool_ttl_stagger, + reader_lease_duration: other_reader_lease_duration, sink_minimum_batch_updates: other_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates, next_listen_batch_retryer: other_next_listen_batch_retryer, @@ -848,6 +862,9 @@ impl PersistParameters { if let Some(v) = other_consensus_connection_pool_ttl_stagger { *self_consensus_connection_pool_ttl_stagger = Some(v); } + if let Some(v) = other_reader_lease_duration { + *self_reader_lease_duration = Some(v); + } if let Some(v) = other_sink_minimum_batch_updates { *self_sink_minimum_batch_updates = Some(v); } @@ -898,6 +915,7 @@ impl PersistParameters { consensus_tcp_user_timeout, consensus_connection_pool_ttl, consensus_connection_pool_ttl_stagger, + reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, next_listen_batch_retryer, @@ -918,6 +936,7 @@ impl PersistParameters { && consensus_tcp_user_timeout.is_none() && consensus_connection_pool_ttl.is_none() && consensus_connection_pool_ttl_stagger.is_none() + && reader_lease_duration.is_none() && sink_minimum_batch_updates.is_none() && storage_sink_minimum_batch_updates.is_none() && next_listen_batch_retryer.is_none() @@ -947,6 +966,7 @@ impl PersistParameters { consensus_tcp_user_timeout, consensus_connection_pool_ttl, consensus_connection_pool_ttl_stagger, + reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, next_listen_batch_retryer, @@ -1010,6 +1030,10 @@ impl PersistParameters { .expect("lock poisoned"); *stagger = *consensus_connection_pool_ttl_stagger; } + if let Some(reader_lease_duration) = reader_lease_duration { + cfg.dynamic + .set_reader_lease_duration(*reader_lease_duration); + } if let Some(sink_minimum_batch_updates) = sink_minimum_batch_updates { cfg.dynamic .sink_minimum_batch_updates @@ -1092,6 +1116,7 @@ impl RustType for PersistParameters { consensus_connection_pool_ttl_stagger: self .consensus_connection_pool_ttl_stagger .into_proto(), + reader_lease_duration: self.reader_lease_duration.into_proto(), sink_minimum_batch_updates: self.sink_minimum_batch_updates.into_proto(), storage_sink_minimum_batch_updates: self .storage_sink_minimum_batch_updates @@ -1120,6 +1145,8 @@ impl RustType for PersistParameters { consensus_connection_pool_ttl_stagger: proto .consensus_connection_pool_ttl_stagger .into_rust()?, + reader_lease_duration: proto.reader_lease_duration.into_rust()?, + sink_minimum_batch_updates: proto.sink_minimum_batch_updates.into_rust()?, storage_sink_minimum_batch_updates: proto .storage_sink_minimum_batch_updates diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index e78e520e2912..2f99be90087b 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -899,7 +899,7 @@ where reader_id ); isolated_runtime.spawn_named(|| name, async move { - let sleep_duration = machine.applier.cfg.reader_lease_duration / 2; + let sleep_duration = machine.applier.cfg.dynamic.reader_lease_duration() / 2; loop { let before_sleep = Instant::now(); tokio::time::sleep(sleep_duration).await; @@ -1692,7 +1692,7 @@ pub mod datadriven { .register_leased_reader( &reader_id, "tests", - datadriven.client.cfg.reader_lease_duration, + datadriven.client.cfg.dynamic.reader_lease_duration(), (datadriven.client.cfg.now)(), ) .await; diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 70c00e54cc37..63b7746057ce 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -433,7 +433,7 @@ impl PersistClient { .register_leased_reader( &reader_id, &diagnostics.handle_purpose, - self.cfg.reader_lease_duration, + self.cfg.dynamic.reader_lease_duration(), heartbeat_ts, ) .await; @@ -1972,7 +1972,10 @@ mod tests { // Verify that the ReadHandle and WriteHandle background heartbeat tasks // shut down cleanly after the handle is expired. let mut cache = new_test_client_cache(); - cache.cfg.reader_lease_duration = Duration::from_millis(1); + cache + .cfg + .dynamic + .set_reader_lease_duration(Duration::from_millis(1)); cache.cfg.writer_lease_duration = Duration::from_millis(1); let (_write, mut read) = cache .open(PersistLocation::new_in_mem()) diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 42f433ca4cde..abbc6d87ed65 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -810,7 +810,7 @@ where .register_leased_reader( &new_reader_id, purpose, - self.cfg.reader_lease_duration, + self.cfg.dynamic.reader_lease_duration(), heartbeat_ts, ) .await; @@ -846,7 +846,7 @@ where // NB: min_elapsed is intentionally smaller than the one in // maybe_heartbeat_reader (this is the preferential treatment mentioned // above). - let min_elapsed = self.cfg.reader_lease_duration / 4; + let min_elapsed = self.cfg.dynamic.reader_lease_duration() / 4; let elapsed_since_last_heartbeat = Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat)); if elapsed_since_last_heartbeat >= min_elapsed { @@ -861,12 +861,14 @@ where /// or [Self::maybe_downgrade_since] on some interval that is "frequent" /// compared to PersistConfig::FAKE_READ_LEASE_DURATION. pub(crate) async fn maybe_heartbeat_reader(&mut self) { - let min_elapsed = self.cfg.reader_lease_duration / 2; + let min_elapsed = self.cfg.dynamic.reader_lease_duration() / 2; let heartbeat_ts = (self.cfg.now)(); let elapsed_since_last_heartbeat = Duration::from_millis(heartbeat_ts.saturating_sub(self.last_heartbeat)); if elapsed_since_last_heartbeat >= min_elapsed { - if elapsed_since_last_heartbeat > self.machine.applier.cfg.reader_lease_duration { + if elapsed_since_last_heartbeat + > self.machine.applier.cfg.dynamic.reader_lease_duration() + { warn!( "reader ({}) of shard ({}) went {}s between heartbeats", self.reader_id, diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 44faf877056d..4e270be99141 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -595,6 +595,14 @@ const PERSIST_CONSENSUS_CONNECTION_POOL_TTL: ServerVar = ServerVar { internal: true, }; +/// Controls [`mz_persist_client::cfg::DynamicConfig::consensus_connection_pool_ttl`]. +const PERSIST_READER_LEASE_DURATION: ServerVar = ServerVar { + name: UncasedStr::new("persist_reader_lease_duration"), + value: &PersistConfig::DEFAULT_READ_LEASE_DURATION, + description: "The time after which we'll clean up stale read leases", + internal: true, +}; + /// Controls [`mz_persist_client::cfg::DynamicConfig::consensus_connection_pool_ttl_stagger`]. const PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER: ServerVar = ServerVar { name: UncasedStr::new("persist_consensus_connection_pool_ttl_stagger"), @@ -2282,6 +2290,7 @@ impl SystemVars { .with_var(&PERSIST_COMPACTION_MINIMUM_TIMEOUT) .with_var(&PERSIST_CONSENSUS_CONNECTION_POOL_TTL) .with_var(&PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER) + .with_var(&PERSIST_READER_LEASE_DURATION) .with_var(&CRDB_CONNECT_TIMEOUT) .with_var(&CRDB_TCP_USER_TIMEOUT) .with_var(&DATAFLOW_MAX_INFLIGHT_BYTES) @@ -2773,6 +2782,10 @@ impl SystemVars { *self.expect_value(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP) } + pub fn persist_reader_lease_duration(&self) -> Duration { + *self.expect_value(&PERSIST_READER_LEASE_DURATION) + } + /// Returns the `persist_compaction_minimum_timeout` configuration parameter. pub fn persist_compaction_minimum_timeout(&self) -> Duration { *self.expect_value(&PERSIST_COMPACTION_MINIMUM_TIMEOUT) @@ -4574,6 +4587,7 @@ fn is_persist_config_var(name: &str) -> bool { || name == PERSIST_COMPACTION_MINIMUM_TIMEOUT.name() || name == PERSIST_CONSENSUS_CONNECTION_POOL_TTL.name() || name == PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER.name() + || name == PERSIST_READER_LEASE_DURATION.name() || name == CRDB_CONNECT_TIMEOUT.name() || name == CRDB_TCP_USER_TIMEOUT.name() || name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() diff --git a/src/storage/src/source/reclock.rs b/src/storage/src/source/reclock.rs index 9bce76761120..57552b9218d4 100644 --- a/src/storage/src/source/reclock.rs +++ b/src/storage/src/source/reclock.rs @@ -634,8 +634,10 @@ mod tests { static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15); static PERSIST_CACHE: Lazy> = Lazy::new(|| { - let mut persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()); - persistcfg.reader_lease_duration = PERSIST_READER_LEASE_TIMEOUT_MS; + let persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()); + persistcfg + .dynamic + .set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS); Arc::new(PersistClientCache::new( persistcfg, &MetricsRegistry::new(), diff --git a/src/storage/tests/setup.rs b/src/storage/tests/setup.rs index 7d95f4bb164a..65269df317e6 100644 --- a/src/storage/tests/setup.rs +++ b/src/storage/tests/setup.rs @@ -200,7 +200,9 @@ where let decode_metrics = DecodeMetrics::register_with(&metrics_registry); let mut persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()); - persistcfg.reader_lease_duration = std::time::Duration::from_secs(60 * 15); + persistcfg + .dynamic + .set_reader_lease_duration(Duration::from_secs(60 * 15)); persistcfg.now = SYSTEM_TIME.clone(); let persist_location = mz_persist_client::PersistLocation {