Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub const WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG: Config<Duration> = Config::new(

pub const WITH_0DT_CAUGHT_UP_CHECK_CUTOFF: Config<Duration> = Config::new(
"with_0dt_caught_up_check_cutoff",
Duration::from_secs(10 * 60), // 10 minutes
"During a 0dt deployment, if a cluster has only 'problematic' (crash-looping) replicas _and_ any collection that is behind by more than this cutoff, the cluster will be ignored in caught-up checks.",
Duration::from_secs(2 * 60 * 60), // 2 hours
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this back to the old default, since cutting over too soon is worse than cutting over too late. A value between the two might work as well, but not sure how to choose one.

"Collections whose write frontier is behind 'now' by more than the cutoff are ignored when doing caught-up checks for 0dt deployments.",
);

pub const ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK: Config<bool> = Config::new(
Expand Down
59 changes: 34 additions & 25 deletions src/adapter/src/coord/caught_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Coordinator {
BTreeSet::new()
};

let compute_caught_up = self
let caught_up = self
.clusters_caught_up(
allowed_lag.into(),
cutoff.into(),
Expand All @@ -156,26 +156,32 @@ impl Coordinator {
)
.await;

tracing::info!(%compute_caught_up, "checked caught-up status of collections");
tracing::info!(%caught_up, "checked caught-up status of collections");

if compute_caught_up {
if caught_up {
let ctx = self.caught_up_check.take().expect("known to exist");
ctx.trigger.fire();
}
}

/// Returns `true` if all non-transient, non-excluded collections have their write
/// frontier (aka. upper) within `allowed_lag` of the "live" frontier
/// reported in `live_frontiers`. The "live" frontiers are frontiers as
/// reported by a currently running `environmentd` deployment, during a 0dt
/// upgrade.
/// Returns whether all clusters are considered caught-up.
///
/// Collections whose write frontier is behind `now` by more than the cutoff
/// are ignored.
/// Informally, a cluster is considered caught-up if it is at least as healthy as its
/// counterpart in the leader environment. To determine that, we use the following rules:
///
/// For this check, zero-replica clusters are always considered caught up.
/// Their collections would never normally be considered caught up but it's
/// clearly intentional that they have no replicas.
/// (1) A cluster is caught-up if all non-transient, non-excluded collections installed on it
/// are either caught-up or ignored.
/// (2) A collection is caught-up when it is (a) hydrated and (b) its write frontier is within
/// `allowed_lag` of the "live" frontier, the collection's frontier reported by the leader
/// environment.
/// (3) A collection is ignored if its "live" frontier is behind `now` by more than `cutoff`.
/// Such a collection is unhealthy in the leader environment, so we don't care about its
/// health in the read-only environment either.
/// (4) On a cluster that is crash-looping, all collections are ignored.
///
/// For this check, zero-replica clusters are always considered caught up. Their collections
/// would never normally be considered caught up but it's clearly intentional that they have no
/// replicas.
async fn clusters_caught_up(
&self,
allowed_lag: Timestamp,
Expand Down Expand Up @@ -219,17 +225,9 @@ impl Coordinator {
result
}

/// Returns `true` if all non-transient, non-excluded collections have their write
/// frontier (aka. upper) within `allowed_lag` of the "live" frontier
/// reported in `live_frontiers`. The "live" frontiers are frontiers as
/// reported by a currently running `environmentd` deployment, during a 0dt
/// upgrade.
///
/// Collections whose write frontier is behind `now` by more than the cutoff
/// are ignored.
/// Returns whether the given cluster is considered caught-up.
///
/// This also returns `true` in case this cluster does not have any
/// replicas.
/// See [`Coordinator::clusters_caught_up`] for details.
async fn collections_caught_up(
&self,
cluster: &Cluster,
Expand All @@ -245,7 +243,7 @@ impl Coordinator {
}

// Check if all replicas in this cluster are crash/OOM-looping. As long
// as there is at least one healthy replica, the cluster is okay-ish.
// as there is at least one healthy replica, the cluster is okay-ish.
let cluster_has_only_problematic_replicas = cluster
.replicas()
.all(|replica| problematic_replicas.contains(&replica.replica_id));
Expand Down Expand Up @@ -318,10 +316,21 @@ impl Coordinator {
"live write frontier of collection {id} is too far behind 'now'"
);
tracing::info!(
"ALL replicas of cluster {} are crash/OOM-looping and it has at least one collection that is too far behind 'now', ignoring cluster for caught-up checks",
"ALL replicas of cluster {} are crash/OOM-looping and it has at least one \
collection that is too far behind 'now'; ignoring cluster for caught-up \
checks",
cluster.id
);
return Ok(true);
} else if beyond_all_hope {
tracing::info!(
?live_write_frontier,
?cutoff,
?now,
"live write frontier of collection {id} is too far behind 'now'; \
ignoring for caught-up checks"
);
continue;
}

// We can't do easy comparisons and subtractions, so we bump up the
Expand Down
32 changes: 32 additions & 0 deletions test/0dt/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -2745,3 +2745,35 @@ def workflow_ddl(c: Composition) -> None:
"""
)
)


def workflow_stuck_collection(c: Composition) -> None:
"""Verify that stuck collections don't hold up a 0dt deployment."""

c.down(destroy_volumes=True)

c.up("mz_old")

c.sql(
"ALTER SYSTEM SET with_0dt_caught_up_check_cutoff = '10s'",
service="mz_old",
port=6877,
user="mz_system",
)

c.sql(
"""
CREATE CLUSTER paused SIZE 'scale=1,workers=1', REPLICATION FACTOR 0;
CREATE CLUSTER test SIZE 'scale=1,workers=1';

CREATE TABLE t (a int);
CREATE MATERIALIZED VIEW mv IN CLUSTER paused AS SELECT * FROM t;
CREATE INDEX idx IN CLUSTER test ON mv (a);
""",
service="mz_old",
)

c.up("mz_new")
c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
c.promote_mz("mz_new")
c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None)