Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: run the heartbeat_read task on two tokio runtimes #21873

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 69 additions & 39 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,55 +886,85 @@ where
D: Semigroup + Codec64 + Send + Sync,
{
#[allow(clippy::unused_async)]
pub async fn start_reader_heartbeat_task(
pub async fn start_reader_heartbeat_tasks(
self,
reader_id: LeasedReaderId,
gc: GarbageCollector<K, V, T, D>,
) -> JoinHandle<()> {
let mut machine = self;
let isolated_runtime = Arc::clone(&machine.isolated_runtime);
) -> Vec<JoinHandle<()>> {
let mut ret = Vec::new();

// TODO: In response to a production incident, this runs the heartbeat
// task on both the in-context tokio runtime and persist's isolated
// runtime. We think we were seeing tasks (including this one) get stuck
// indefinitely in tokio while waiting for a runtime worker. This could
// happen if some other task in that runtime never yields. It's possible
// that one of the two runtimes is healthy while the other isn't (this
// was inconclusive in the incident debugging), and the heartbeat task
// is fairly lightweight, so run a copy in each in case that helps.
//
// The real fix here is to find the misbehaving task and fix it. Remove
// this duplication when that happens.
let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
ret.push(mz_ore::task::spawn(|| name, {
let machine = self.clone();
let reader_id = reader_id.clone();
let gc = gc.clone();
Self::reader_heartbeat_task(machine, reader_id, gc)
}));

let isolated_runtime = Arc::clone(&self.isolated_runtime);
let name = format!(
"persist::heartbeat_read({},{})",
machine.shard_id(),
"persist::heartbeat_read_isolated({},{})",
self.shard_id(),
reader_id
);
isolated_runtime.spawn_named(|| name, async move {
let sleep_duration = machine.applier.cfg.dynamic.reader_lease_duration() / 2;
loop {
let before_sleep = Instant::now();
tokio::time::sleep(sleep_duration).await;

let elapsed_since_before_sleeping = before_sleep.elapsed();
if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) went {}s between heartbeats",
reader_id,
machine.shard_id(),
elapsed_since_before_sleeping.as_secs_f64()
);
}
ret.push(
isolated_runtime.spawn_named(|| name, Self::reader_heartbeat_task(self, reader_id, gc)),
);

let before_heartbeat = Instant::now();
let (_seqno, existed, maintenance) = machine
.heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
.await;
maintenance.start_performing(&machine, &gc);
ret
}

let elapsed_since_heartbeat = before_heartbeat.elapsed();
if elapsed_since_heartbeat > Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) heartbeat call took {}s",
reader_id,
machine.shard_id(),
elapsed_since_heartbeat.as_secs_f64(),
);
}
async fn reader_heartbeat_task(
mut machine: Self,
reader_id: LeasedReaderId,
gc: GarbageCollector<K, V, T, D>,
) {
let sleep_duration = machine.applier.cfg.dynamic.reader_lease_duration() / 2;
loop {
let before_sleep = Instant::now();
tokio::time::sleep(sleep_duration).await;

if !existed {
return;
}
let elapsed_since_before_sleeping = before_sleep.elapsed();
if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) went {}s between heartbeats",
reader_id,
machine.shard_id(),
elapsed_since_before_sleeping.as_secs_f64()
);
}
})

let before_heartbeat = Instant::now();
let (_seqno, existed, maintenance) = machine
.heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
.await;
maintenance.start_performing(&machine, &gc);

let elapsed_since_heartbeat = before_heartbeat.elapsed();
if elapsed_since_heartbeat > Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) heartbeat call took {}s",
reader_id,
machine.shard_id(),
elapsed_since_heartbeat.as_secs_f64(),
);
}

if !existed {
return;
}
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1983,14 +1983,16 @@ mod tests {
.expect("client construction failed")
.expect_open::<(), (), u64, i64>(ShardId::new())
.await;
let read_heartbeat_task = read
.heartbeat_task
let read_heartbeat_tasks = read
.heartbeat_tasks
.take()
.expect("handle should have heartbeat task");
read.expire().await;
let () = read_heartbeat_task
.await
.expect("task should shutdown cleanly");
for read_heartbeat_task in read_heartbeat_tasks {
let () = read_heartbeat_task
.await
.expect("task should shutdown cleanly");
}
}

/// Regression test for 16743, where the nightly tests found that calling
Expand Down
10 changes: 6 additions & 4 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ where
explicitly_expired: bool,
lease_returner: SubscriptionLeaseReturner,

pub(crate) heartbeat_task: Option<JoinHandle<()>>,
pub(crate) heartbeat_tasks: Option<Vec<JoinHandle<()>>>,
}

impl<K, V, T, D> ReadHandle<K, V, T, D>
Expand Down Expand Up @@ -577,7 +577,7 @@ where
reader_id: reader_id.clone(),
metrics,
},
heartbeat_task: Some(machine.start_reader_heartbeat_task(reader_id, gc).await),
heartbeat_tasks: Some(machine.start_reader_heartbeat_tasks(reader_id, gc).await),
}
}

Expand Down Expand Up @@ -1238,8 +1238,10 @@ where
D: Semigroup + Codec64 + Send + Sync,
{
fn drop(&mut self) {
if let Some(heartbeat_task) = self.heartbeat_task.take() {
heartbeat_task.abort();
if let Some(heartbeat_tasks) = self.heartbeat_tasks.take() {
for heartbeat_task in heartbeat_tasks {
heartbeat_task.abort();
}
}
if self.explicitly_expired {
return;
Expand Down