Skip to content

Commit 25b3ef1

Browse files
authored
Fix cache invalidation when cycle head becomes non-head (#1014)
* Fix cache invalidation when cycle head becomes non-head * Discard changes to src/function/fetch.rs * Inline comment
1 parent d38145c commit 25b3ef1

File tree

6 files changed

+73
-33
lines changed

6 files changed

+73
-33
lines changed

src/cycle.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ impl CycleHeads {
346346

347347
if *removed {
348348
*removed = false;
349+
existing.iteration_count.store_mut(iteration_count);
349350

350351
true
351352
} else {
@@ -468,15 +469,25 @@ pub(crate) fn empty_cycle_heads() -> &'static CycleHeads {
468469
EMPTY_CYCLE_HEADS.get_or_init(|| CycleHeads(ThinVec::new()))
469470
}
470471

471-
#[derive(Debug, PartialEq, Eq)]
472-
pub enum ProvisionalStatus {
472+
#[derive(Debug)]
473+
pub enum ProvisionalStatus<'db> {
473474
Provisional {
474475
iteration: IterationCount,
475476
verified_at: Revision,
477+
cycle_heads: &'db CycleHeads,
476478
},
477479
Final {
478480
iteration: IterationCount,
479481
verified_at: Revision,
480482
},
481483
FallbackImmediate,
482484
}
485+
486+
impl<'db> ProvisionalStatus<'db> {
487+
pub(crate) fn cycle_heads(&self) -> &'db CycleHeads {
488+
match self {
489+
ProvisionalStatus::Provisional { cycle_heads, .. } => cycle_heads,
490+
_ => empty_cycle_heads(),
491+
}
492+
}
493+
}

src/function.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@ use std::ptr::NonNull;
77
use std::sync::atomic::Ordering;
88
use std::sync::OnceLock;
99

10-
use crate::cycle::{
11-
empty_cycle_heads, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy, IterationCount,
12-
ProvisionalStatus,
13-
};
10+
use crate::cycle::{CycleRecoveryAction, CycleRecoveryStrategy, IterationCount, ProvisionalStatus};
1411
use crate::database::RawDatabase;
1512
use crate::function::delete::DeletedEntries;
1613
use crate::hash::{FxHashSet, FxIndexSet};
@@ -357,7 +354,11 @@ where
357354
///
358355
/// Otherwise, the value is still provisional. For both final and provisional, it also
359356
/// returns the iteration in which this memo was created (always 0 except for cycle heads).
360-
fn provisional_status(&self, zalsa: &Zalsa, input: Id) -> Option<ProvisionalStatus> {
357+
fn provisional_status<'db>(
358+
&self,
359+
zalsa: &'db Zalsa,
360+
input: Id,
361+
) -> Option<ProvisionalStatus<'db>> {
361362
let memo =
362363
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))?;
363364

@@ -377,6 +378,7 @@ where
377378
ProvisionalStatus::Provisional {
378379
iteration,
379380
verified_at: memo.verified_at.load(),
381+
cycle_heads: memo.cycle_heads(),
380382
}
381383
})
382384
}
@@ -416,12 +418,6 @@ where
416418
self.sync_table.mark_as_transfer_target(key_index)
417419
}
418420

419-
fn cycle_heads<'db>(&self, zalsa: &'db Zalsa, input: Id) -> &'db CycleHeads {
420-
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))
421-
.map(|memo| memo.cycle_heads())
422-
.unwrap_or(empty_cycle_heads())
423-
}
424-
425421
/// Attempts to claim `key_index` without blocking.
426422
///
427423
/// * [`WaitForResult::Running`] if the `key_index` is running on another thread. It's up to the caller to block on the other thread

src/function/execute.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,11 @@ where
248248
let ingredient =
249249
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
250250

251-
for nested_head in
252-
ingredient.cycle_heads(zalsa, head.database_key_index.key_index())
253-
{
251+
let provisional_status = ingredient
252+
.provisional_status(zalsa, head.database_key_index.key_index())
253+
.expect("cycle head memo must have been created during the execution");
254+
255+
for nested_head in provisional_status.cycle_heads() {
254256
let nested_as_tuple = (
255257
nested_head.database_key_index,
256258
nested_head.iteration_count.load(),
@@ -442,6 +444,8 @@ where
442444

443445
// Update the iteration count of this cycle head, but only after restoring
444446
// the cycle heads array (or this becomes a no-op).
447+
// We don't call the same method on `cycle_heads` because that one doens't update
448+
// the `memo.iteration_count`
445449
completed_query.revisions.set_cycle_heads(cycle_heads);
446450
completed_query
447451
.revisions

src/function/maybe_changed_after.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,9 @@ where
484484

485485
// Always return `false` for cycle initial values "unless" they are running in the same thread.
486486
if cycle_heads
487-
.iter()
488-
.all(|head| head.database_key_index == memo_database_key_index)
487+
.iter_not_eq(memo_database_key_index)
488+
.next()
489+
.is_none()
489490
{
490491
// SAFETY: We do not access the query stack reentrantly.
491492
let on_stack = unsafe {
@@ -508,6 +509,8 @@ where
508509
head_iteration_count,
509510
memo_iteration_count: current_iteration_count,
510511
verified_at: head_verified_at,
512+
cycle_heads,
513+
database_key_index: head_database_key,
511514
} => {
512515
if head_verified_at != memo_verified_at {
513516
return false;
@@ -516,6 +519,27 @@ where
516519
if head_iteration_count != current_iteration_count {
517520
return false;
518521
}
522+
523+
// Check if the memo is still a cycle head and hasn't changed
524+
// to a normal cycle participant. This is to force re-execution in
525+
// a scenario like this:
526+
//
527+
// * There's a nested cycle with the outermost query A
528+
// * B participates in the cycle and is a cycle head in the first few iterations
529+
// * B becomes a non-cycle head in a later iteration
530+
// * There's a query `C` that has `B` as its cycle head
531+
//
532+
// The crucial point is that `B` switches from being a cycle head to being a regular cycle participant.
533+
// The issue with that is that `A` doesn't update `B`'s `iteration_count `when the iteration completes
534+
// because it only does that for cycle heads (and collecting all queries participating in a query would be sort of expensive?).
535+
//
536+
// When we now pull `C` in a later iteration, `validate_same_iteration` iterates over all its cycle heads (`B`),
537+
// and check if the iteration count still matches. Which is the case because `A` didn't update `B`'s iteration count.
538+
//
539+
// That's why we also check if `B` is still a cycle head in the current iteration.
540+
if !cycle_heads.contains(&head_database_key) {
541+
return false;
542+
}
519543
}
520544
_ => {
521545
return false;

src/function/memo.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,11 @@ mod persistence {
409409
pub(super) enum TryClaimHeadsResult<'me> {
410410
/// Claiming the cycle head results in a cycle.
411411
Cycle {
412+
database_key_index: DatabaseKeyIndex,
412413
head_iteration_count: IterationCount,
413414
memo_iteration_count: IterationCount,
414415
verified_at: Revision,
416+
cycle_heads: &'me CycleHeads,
415417
},
416418

417419
/// The cycle head is not finalized, but it can be claimed.
@@ -458,23 +460,28 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> {
458460
let provisional_status = ingredient
459461
.provisional_status(self.zalsa, head_key_index)
460462
.expect("cycle head memo to exist");
461-
let (current_iteration_count, verified_at) = match provisional_status {
463+
let (current_iteration_count, verified_at, cycle_heads) = match provisional_status {
462464
ProvisionalStatus::Provisional {
463465
iteration,
464466
verified_at,
465-
}
466-
| ProvisionalStatus::Final {
467+
cycle_heads,
468+
} => (iteration, verified_at, cycle_heads),
469+
ProvisionalStatus::Final {
467470
iteration,
468471
verified_at,
469-
} => (iteration, verified_at),
470-
ProvisionalStatus::FallbackImmediate => {
471-
(IterationCount::initial(), self.zalsa.current_revision())
472-
}
472+
} => (iteration, verified_at, empty_cycle_heads()),
473+
ProvisionalStatus::FallbackImmediate => (
474+
IterationCount::initial(),
475+
self.zalsa.current_revision(),
476+
empty_cycle_heads(),
477+
),
473478
};
474479

475480
Some(TryClaimHeadsResult::Cycle {
481+
database_key_index: head_database_key,
476482
memo_iteration_count: current_iteration_count,
477483
head_iteration_count: head.iteration_count.load(),
484+
cycle_heads,
478485
verified_at,
479486
})
480487
}

src/ingredient.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::any::{Any, TypeId};
22
use std::fmt;
33

4-
use crate::cycle::{empty_cycle_heads, CycleHeads, IterationCount, ProvisionalStatus};
4+
use crate::cycle::{IterationCount, ProvisionalStatus};
55
use crate::database::RawDatabase;
66
use crate::function::{VerifyCycleHeads, VerifyResult};
77
use crate::hash::{FxHashSet, FxIndexSet};
@@ -74,16 +74,14 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync {
7474
/// Is it a provisional value or has it been finalized and in which iteration.
7575
///
7676
/// Returns `None` if `input` doesn't exist.
77-
fn provisional_status(&self, _zalsa: &Zalsa, _input: Id) -> Option<ProvisionalStatus> {
77+
fn provisional_status<'db>(
78+
&self,
79+
_zalsa: &'db Zalsa,
80+
_input: Id,
81+
) -> Option<ProvisionalStatus<'db>> {
7882
unreachable!("provisional_status should only be called on cycle heads and only functions can be cycle heads");
7983
}
8084

81-
/// Returns the cycle heads for this ingredient.
82-
fn cycle_heads<'db>(&self, zalsa: &'db Zalsa, input: Id) -> &'db CycleHeads {
83-
_ = (zalsa, input);
84-
empty_cycle_heads()
85-
}
86-
8785
/// Invoked when the current thread needs to wait for a result for the given `key_index`.
8886
/// This call doesn't block the current thread. Instead, it's up to the caller to block
8987
/// in case `key_index` is [running](`WaitForResult::Running`) on another thread.

0 commit comments

Comments
 (0)