Skip to content

Commit

Permalink
[persist] Dynamic lease timeouts (MaterializeInc#21822)
Browse files Browse the repository at this point in the history
This is a long-time known-useful knob which has recently become more urgent.
  • Loading branch information
bkirwi authored and danhhz committed Sep 19, 2023
1 parent dde86f6 commit 431a609
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters {
multiplier: config.persist_next_listen_batch_retryer_multiplier(),
clamp: config.persist_next_listen_batch_retryer_clamp(),
}),
reader_lease_duration: Some(config.persist_reader_lease_duration()),
stats_audit_percent: Some(config.persist_stats_audit_percent()),
stats_collection_enabled: Some(config.persist_stats_collection_enabled()),
stats_filter_enabled: Some(config.persist_stats_filter_enabled()),
Expand Down
8 changes: 8 additions & 0 deletions src/persist-client/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Async runtime extensions.

use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};

use mz_ore::task::RuntimeExt;
use tokio::runtime::{Builder, Runtime};
Expand All @@ -36,6 +37,13 @@ impl IsolatedRuntime {
// TODO: choose a more principled `worker_limit`. Right now we use the
// Tokio default, which is presently the number of cores on the machine.
let runtime = Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
// This will wrap around eventually, which is not ideal, but it's important that
// it stays small to fit within OS limits.
format!("persist:{:04x}", id % 0x10000)
})
.enable_all()
.build()
.expect("known to be valid");
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/cfg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message ProtoPersistParameters {
optional uint64 blob_cache_mem_limit_bytes = 14;
mz_proto.ProtoDuration consensus_connection_pool_ttl = 15;
mz_proto.ProtoDuration consensus_connection_pool_ttl_stagger = 16;
mz_proto.ProtoDuration reader_lease_duration = 20;
optional uint64 stats_budget_bytes = 17;
optional ProtoUntrimmableColumns stats_untrimmable_columns = 18;
map<string, bool> feature_flags = 19;
Expand Down
39 changes: 33 additions & 6 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct PersistConfig {
/// A clock to use for all leasing and other non-debugging use.
pub now: NowFn,
/// Configurations that can be dynamically updated.
pub(crate) dynamic: Arc<DynamicConfig>,
pub dynamic: Arc<DynamicConfig>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
Expand All @@ -133,9 +133,6 @@ pub struct PersistConfig {
/// Length of time after a writer's last operation after which the writer
/// may be expired.
pub writer_lease_duration: Duration,
/// Length of time after a reader's last operation after which the reader
/// may be expired.
pub reader_lease_duration: Duration,
/// Length of time between critical handles' calls to downgrade since
pub critical_downgrade_interval: Duration,
/// Timeout per connection attempt to Persist PubSub service.
Expand Down Expand Up @@ -179,6 +176,7 @@ impl PersistConfig {
),
consensus_connect_timeout: RwLock::new(Self::DEFAULT_CRDB_CONNECT_TIMEOUT),
consensus_tcp_user_timeout: RwLock::new(Self::DEFAULT_CRDB_TCP_USER_TIMEOUT),
reader_lease_duration: RwLock::new(Self::DEFAULT_READ_LEASE_DURATION),
gc_blob_delete_concurrency_limit: AtomicUsize::new(32),
state_versions_recent_live_diffs_limit: AtomicUsize::new(
30 * Self::DEFAULT_ROLLUP_THRESHOLD,
Expand Down Expand Up @@ -215,7 +213,6 @@ impl PersistConfig {
compaction_yield_after_n_updates: 100_000,
consensus_connection_pool_max_size: 50,
writer_lease_duration: 60 * Duration::from_secs(60),
reader_lease_duration: Self::DEFAULT_READ_LEASE_DURATION,
critical_downgrade_interval: Duration::from_secs(30),
pubsub_connect_attempt_timeout: Duration::from_secs(5),
pubsub_connect_max_backoff: Duration::from_secs(60),
Expand Down Expand Up @@ -340,7 +337,7 @@ impl PersistConfig {
//
// MIGRATION: Remove this once we remove the ReaderState <->
// ProtoReaderState migration.
pub(crate) const DEFAULT_READ_LEASE_DURATION: Duration = Duration::from_secs(60 * 15);
pub const DEFAULT_READ_LEASE_DURATION: Duration = Duration::from_secs(60 * 15);

// TODO: Get rid of this in favor of using PersistParameters at the
// relevant callsites.
Expand Down Expand Up @@ -417,6 +414,7 @@ pub struct DynamicConfig {
consensus_tcp_user_timeout: RwLock<Duration>,
consensus_connection_pool_ttl: RwLock<Duration>,
consensus_connection_pool_ttl_stagger: RwLock<Duration>,
reader_lease_duration: RwLock<Duration>,
sink_minimum_batch_updates: AtomicUsize,
storage_sink_minimum_batch_updates: AtomicUsize,
stats_audit_percent: AtomicUsize,
Expand Down Expand Up @@ -598,6 +596,18 @@ impl DynamicConfig {
.expect("lock poisoned")
}

/// Length of time after a reader's last operation after which the reader
/// may be expired.
pub fn reader_lease_duration(&self) -> Duration {
*self.reader_lease_duration.read().expect("lock poisoned")
}

/// Set the length of time after a reader's last operation after which the reader
/// may be expired.
pub fn set_reader_lease_duration(&self, d: Duration) {
*self.reader_lease_duration.write().expect("lock poisoned") = d;
}

/// The maximum number of concurrent blob deletes during garbage collection.
pub fn gc_blob_delete_concurrency_limit(&self) -> usize {
self.gc_blob_delete_concurrency_limit
Expand Down Expand Up @@ -756,6 +766,8 @@ pub struct PersistParameters {
pub consensus_connection_pool_ttl_stagger: Option<Duration>,
/// Configures [`DynamicConfig::next_listen_batch_retry_params`].
pub next_listen_batch_retryer: Option<RetryParameters>,
/// Configures [`DynamicConfig::reader_lease_duration`].
pub reader_lease_duration: Option<Duration>,
/// Configures [`PersistConfig::sink_minimum_batch_updates`].
pub sink_minimum_batch_updates: Option<usize>,
/// Configures [`PersistConfig::storage_sink_minimum_batch_updates`].
Expand Down Expand Up @@ -793,6 +805,7 @@ impl PersistParameters {
consensus_tcp_user_timeout: self_consensus_tcp_user_timeout,
consensus_connection_pool_ttl: self_consensus_connection_pool_ttl,
consensus_connection_pool_ttl_stagger: self_consensus_connection_pool_ttl_stagger,
reader_lease_duration: self_reader_lease_duration,
sink_minimum_batch_updates: self_sink_minimum_batch_updates,
storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates,
next_listen_batch_retryer: self_next_listen_batch_retryer,
Expand All @@ -814,6 +827,7 @@ impl PersistParameters {
consensus_tcp_user_timeout: other_consensus_tcp_user_timeout,
consensus_connection_pool_ttl: other_consensus_connection_pool_ttl,
consensus_connection_pool_ttl_stagger: other_consensus_connection_pool_ttl_stagger,
reader_lease_duration: other_reader_lease_duration,
sink_minimum_batch_updates: other_sink_minimum_batch_updates,
storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates,
next_listen_batch_retryer: other_next_listen_batch_retryer,
Expand Down Expand Up @@ -848,6 +862,9 @@ impl PersistParameters {
if let Some(v) = other_consensus_connection_pool_ttl_stagger {
*self_consensus_connection_pool_ttl_stagger = Some(v);
}
if let Some(v) = other_reader_lease_duration {
*self_reader_lease_duration = Some(v);
}
if let Some(v) = other_sink_minimum_batch_updates {
*self_sink_minimum_batch_updates = Some(v);
}
Expand Down Expand Up @@ -898,6 +915,7 @@ impl PersistParameters {
consensus_tcp_user_timeout,
consensus_connection_pool_ttl,
consensus_connection_pool_ttl_stagger,
reader_lease_duration,
sink_minimum_batch_updates,
storage_sink_minimum_batch_updates,
next_listen_batch_retryer,
Expand All @@ -918,6 +936,7 @@ impl PersistParameters {
&& consensus_tcp_user_timeout.is_none()
&& consensus_connection_pool_ttl.is_none()
&& consensus_connection_pool_ttl_stagger.is_none()
&& reader_lease_duration.is_none()
&& sink_minimum_batch_updates.is_none()
&& storage_sink_minimum_batch_updates.is_none()
&& next_listen_batch_retryer.is_none()
Expand Down Expand Up @@ -947,6 +966,7 @@ impl PersistParameters {
consensus_tcp_user_timeout,
consensus_connection_pool_ttl,
consensus_connection_pool_ttl_stagger,
reader_lease_duration,
sink_minimum_batch_updates,
storage_sink_minimum_batch_updates,
next_listen_batch_retryer,
Expand Down Expand Up @@ -1010,6 +1030,10 @@ impl PersistParameters {
.expect("lock poisoned");
*stagger = *consensus_connection_pool_ttl_stagger;
}
if let Some(reader_lease_duration) = reader_lease_duration {
cfg.dynamic
.set_reader_lease_duration(*reader_lease_duration);
}
if let Some(sink_minimum_batch_updates) = sink_minimum_batch_updates {
cfg.dynamic
.sink_minimum_batch_updates
Expand Down Expand Up @@ -1092,6 +1116,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
consensus_connection_pool_ttl_stagger: self
.consensus_connection_pool_ttl_stagger
.into_proto(),
reader_lease_duration: self.reader_lease_duration.into_proto(),
sink_minimum_batch_updates: self.sink_minimum_batch_updates.into_proto(),
storage_sink_minimum_batch_updates: self
.storage_sink_minimum_batch_updates
Expand Down Expand Up @@ -1120,6 +1145,8 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
consensus_connection_pool_ttl_stagger: proto
.consensus_connection_pool_ttl_stagger
.into_rust()?,
reader_lease_duration: proto.reader_lease_duration.into_rust()?,

sink_minimum_batch_updates: proto.sink_minimum_batch_updates.into_rust()?,
storage_sink_minimum_batch_updates: proto
.storage_sink_minimum_batch_updates
Expand Down
4 changes: 2 additions & 2 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ where
reader_id
);
isolated_runtime.spawn_named(|| name, async move {
let sleep_duration = machine.applier.cfg.reader_lease_duration / 2;
let sleep_duration = machine.applier.cfg.dynamic.reader_lease_duration() / 2;
loop {
let before_sleep = Instant::now();
tokio::time::sleep(sleep_duration).await;
Expand Down Expand Up @@ -1692,7 +1692,7 @@ pub mod datadriven {
.register_leased_reader(
&reader_id,
"tests",
datadriven.client.cfg.reader_lease_duration,
datadriven.client.cfg.dynamic.reader_lease_duration(),
(datadriven.client.cfg.now)(),
)
.await;
Expand Down
7 changes: 5 additions & 2 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl PersistClient {
.register_leased_reader(
&reader_id,
&diagnostics.handle_purpose,
self.cfg.reader_lease_duration,
self.cfg.dynamic.reader_lease_duration(),
heartbeat_ts,
)
.await;
Expand Down Expand Up @@ -1972,7 +1972,10 @@ mod tests {
// Verify that the ReadHandle and WriteHandle background heartbeat tasks
// shut down cleanly after the handle is expired.
let mut cache = new_test_client_cache();
cache.cfg.reader_lease_duration = Duration::from_millis(1);
cache
.cfg
.dynamic
.set_reader_lease_duration(Duration::from_millis(1));
cache.cfg.writer_lease_duration = Duration::from_millis(1);
let (_write, mut read) = cache
.open(PersistLocation::new_in_mem())
Expand Down
10 changes: 6 additions & 4 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ where
.register_leased_reader(
&new_reader_id,
purpose,
self.cfg.reader_lease_duration,
self.cfg.dynamic.reader_lease_duration(),
heartbeat_ts,
)
.await;
Expand Down Expand Up @@ -846,7 +846,7 @@ where
// NB: min_elapsed is intentionally smaller than the one in
// maybe_heartbeat_reader (this is the preferential treatment mentioned
// above).
let min_elapsed = self.cfg.reader_lease_duration / 4;
let min_elapsed = self.cfg.dynamic.reader_lease_duration() / 4;
let elapsed_since_last_heartbeat =
Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
if elapsed_since_last_heartbeat >= min_elapsed {
Expand All @@ -861,12 +861,14 @@ where
/// or [Self::maybe_downgrade_since] on some interval that is "frequent"
/// compared to PersistConfig::FAKE_READ_LEASE_DURATION.
pub(crate) async fn maybe_heartbeat_reader(&mut self) {
let min_elapsed = self.cfg.reader_lease_duration / 2;
let min_elapsed = self.cfg.dynamic.reader_lease_duration() / 2;
let heartbeat_ts = (self.cfg.now)();
let elapsed_since_last_heartbeat =
Duration::from_millis(heartbeat_ts.saturating_sub(self.last_heartbeat));
if elapsed_since_last_heartbeat >= min_elapsed {
if elapsed_since_last_heartbeat > self.machine.applier.cfg.reader_lease_duration {
if elapsed_since_last_heartbeat
> self.machine.applier.cfg.dynamic.reader_lease_duration()
{
warn!(
"reader ({}) of shard ({}) went {}s between heartbeats",
self.reader_id,
Expand Down
14 changes: 14 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,14 @@ const PERSIST_CONSENSUS_CONNECTION_POOL_TTL: ServerVar<Duration> = ServerVar {
internal: true,
};

/// Controls [`mz_persist_client::cfg::DynamicConfig::consensus_connection_pool_ttl`].
const PERSIST_READER_LEASE_DURATION: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("persist_reader_lease_duration"),
value: &PersistConfig::DEFAULT_READ_LEASE_DURATION,
description: "The time after which we'll clean up stale read leases",
internal: true,
};

/// Controls [`mz_persist_client::cfg::DynamicConfig::consensus_connection_pool_ttl_stagger`].
const PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("persist_consensus_connection_pool_ttl_stagger"),
Expand Down Expand Up @@ -2282,6 +2290,7 @@ impl SystemVars {
.with_var(&PERSIST_COMPACTION_MINIMUM_TIMEOUT)
.with_var(&PERSIST_CONSENSUS_CONNECTION_POOL_TTL)
.with_var(&PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
.with_var(&PERSIST_READER_LEASE_DURATION)
.with_var(&CRDB_CONNECT_TIMEOUT)
.with_var(&CRDB_TCP_USER_TIMEOUT)
.with_var(&DATAFLOW_MAX_INFLIGHT_BYTES)
Expand Down Expand Up @@ -2773,6 +2782,10 @@ impl SystemVars {
*self.expect_value(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP)
}

pub fn persist_reader_lease_duration(&self) -> Duration {
*self.expect_value(&PERSIST_READER_LEASE_DURATION)
}

/// Returns the `persist_compaction_minimum_timeout` configuration parameter.
pub fn persist_compaction_minimum_timeout(&self) -> Duration {
*self.expect_value(&PERSIST_COMPACTION_MINIMUM_TIMEOUT)
Expand Down Expand Up @@ -4574,6 +4587,7 @@ fn is_persist_config_var(name: &str) -> bool {
|| name == PERSIST_COMPACTION_MINIMUM_TIMEOUT.name()
|| name == PERSIST_CONSENSUS_CONNECTION_POOL_TTL.name()
|| name == PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER.name()
|| name == PERSIST_READER_LEASE_DURATION.name()
|| name == CRDB_CONNECT_TIMEOUT.name()
|| name == CRDB_TCP_USER_TIMEOUT.name()
|| name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
Expand Down
6 changes: 4 additions & 2 deletions src/storage/src/source/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,10 @@ mod tests {
static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);

static PERSIST_CACHE: Lazy<Arc<PersistClientCache>> = Lazy::new(|| {
let mut persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
persistcfg.reader_lease_duration = PERSIST_READER_LEASE_TIMEOUT_MS;
let persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
persistcfg
.dynamic
.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
Arc::new(PersistClientCache::new(
persistcfg,
&MetricsRegistry::new(),
Expand Down
4 changes: 3 additions & 1 deletion src/storage/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ where
let decode_metrics = DecodeMetrics::register_with(&metrics_registry);

let mut persistcfg = PersistConfig::new(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
persistcfg.reader_lease_duration = std::time::Duration::from_secs(60 * 15);
persistcfg
.dynamic
.set_reader_lease_duration(Duration::from_secs(60 * 15));
persistcfg.now = SYSTEM_TIME.clone();

let persist_location = mz_persist_client::PersistLocation {
Expand Down

0 comments on commit 431a609

Please sign in to comment.