Skip to content

Commit

Permalink
persist: set a timeout on compaction requests (MaterializeInc#15260)
Browse files Browse the repository at this point in the history
  • Loading branch information
pH14 authored and Sean Loiselle committed Oct 18, 2022
1 parent 6ca4bf6 commit c333062
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 18 deletions.
68 changes: 51 additions & 17 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use futures_util::TryFutureExt;
use mz_ore::cast::CastFrom;
use mz_persist::indexed::columnar::{
ColumnarRecordsBuilder, ColumnarRecordsVecBuilder, KEY_VAL_DATA_MAX_LEN,
Expand All @@ -32,15 +33,15 @@ use timely::PartialOrder;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, oneshot, TryAcquireError};
use tracing::log::warn;
use tracing::{debug, debug_span, info, Instrument, Span};
use tracing::{debug, debug_span, info, trace, Instrument, Span};

use crate::async_runtime::CpuHeavyRuntime;
use crate::batch::BatchParts;
use crate::fetch::fetch_batch_part;
use crate::internal::machine::{retry_external, Machine};
use crate::internal::state::{HollowBatch, HollowBatchPart};
use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
use crate::{Metrics, PersistConfig, ShardId, WriterId};
use crate::{Metrics, PersistConfig, ShardId, WriterId, MB};

/// A request for compaction.
///
Expand Down Expand Up @@ -141,23 +142,56 @@ where
metrics.compaction.started.inc();
let start = Instant::now();

// Compaction is cpu intensive, so be polite and spawn it on the CPU heavy runtime.
// pick a timeout for our compaction request proportional to the amount
// of data that must be read (with a minimum set by PersistConfig)
let total_input_bytes = req
.inputs
.iter()
.flat_map(|batch| batch.parts.iter())
.map(|parts| parts.encoded_size_bytes)
.sum::<usize>();
let timeout = Duration::max(
// either our minimum timeout
cfg.compaction_minimum_timeout,
// or 1s per MB of input data
Duration::from_secs(u64::cast_from(total_input_bytes / MB)),
);

trace!(
"compaction request for {}MBs ({} bytes), with timeout of {}s.",
total_input_bytes / MB,
total_input_bytes,
timeout.as_secs_f64()
);

let compact_span = debug_span!("compact::consolidate");
let res = cpu_heavy_runtime
.spawn_named(
|| "persist::compact::consolidate",
Self::compact(
cfg.clone(),
Arc::clone(&blob),
Arc::clone(&metrics),
Arc::clone(&cpu_heavy_runtime),
req,
writer_id,
let res = tokio::time::timeout(
timeout,
// Compaction is cpu intensive, so be polite and spawn it on the CPU heavy runtime.
cpu_heavy_runtime
.spawn_named(
|| "persist::compact::consolidate",
Self::compact(
cfg.clone(),
Arc::clone(&blob),
Arc::clone(&metrics),
Arc::clone(&cpu_heavy_runtime),
req,
writer_id,
)
.instrument(compact_span),
)
.instrument(compact_span),
)
.await
.map_err(|err| anyhow!(err));
.map_err(|e| anyhow!(e)),
)
.await;

let res = match res {
Ok(res) => res,
Err(err) => {
metrics.compaction.timed_out.inc();
Err(anyhow!(err))
}
};

metrics
.compaction
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ pub struct CompactionMetrics {
pub(crate) skipped: IntCounter,
pub(crate) started: IntCounter,
pub(crate) applied: IntCounter,
pub(crate) timed_out: IntCounter,
pub(crate) failed: IntCounter,
pub(crate) noop: IntCounter,
pub(crate) seconds: Counter,
Expand Down Expand Up @@ -642,6 +643,10 @@ impl CompactionMetrics {
name: "mz_persist_compaction_applied",
help: "count of compactions applied to state",
)),
timed_out: registry.register(metric!(
name: "mz_persist_compaction_timed_out",
help: "count of compactions that timed out",
)),
noop: registry.register(metric!(
name: "mz_persist_compaction_noop",
help: "count of compactions discarded (obsolete)",
Expand Down
8 changes: 7 additions & 1 deletion src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ impl ShardId {
}
}

pub(crate) const MB: usize = 1024 * 1024;

/// The tunable knobs for persist.
#[derive(Debug, Clone)]
pub struct PersistConfig {
Expand Down Expand Up @@ -217,6 +219,10 @@ pub struct PersistConfig {
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
/// compaction requests that can execute for a given shard.
pub compaction_concurrency_limit: usize,
/// In Compactor::compact_and_apply_background, the minimum amount of time to
/// allow a compaction request to run before timing it out. A request may be
/// given a timeout greater than this value depending on the inputs' size
pub compaction_minimum_timeout: Duration,
/// The maximum size of the connection pool to Postgres/CRDB when performing
/// consensus reads and writes.
pub consensus_connection_pool_max_size: usize,
Expand Down Expand Up @@ -275,7 +281,6 @@ impl PersistConfig {
pub fn new(build_info: &BuildInfo, now: NowFn) -> Self {
// Escape hatch in case we need to disable compaction.
let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
const MB: usize = 1024 * 1024;
Self {
build_version: build_info.semver_version(),
now,
Expand All @@ -286,6 +291,7 @@ impl PersistConfig {
compaction_heuristic_min_inputs: 8,
compaction_heuristic_min_updates: 1024,
compaction_concurrency_limit: 5,
compaction_minimum_timeout: Duration::from_secs(90),
consensus_connection_pool_max_size: 50,
writer_lease_duration: Duration::from_secs(60 * 15),
reader_lease_duration: Self::DEFAULT_READ_LEASE_DURATION,
Expand Down

0 comments on commit c333062

Please sign in to comment.