Skip to content

Commit 7c090ba

Browse files
committed
Run nested cycles in a single fixpoint iteration
Fix serde attribute
1 parent 5330dd9 commit 7c090ba

15 files changed

+957
-339
lines changed

src/active_query.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl ActiveQuery {
225225
active_tracked_structs,
226226
mem::take(cycle_heads),
227227
iteration_count,
228+
false,
228229
);
229230

230231
let revisions = QueryRevisions {
@@ -498,7 +499,7 @@ impl fmt::Display for Backtrace {
498499
if full {
499500
write!(fmt, " -> ({changed_at:?}, {durability:#?}")?;
500501
if !cycle_heads.is_empty() || !iteration_count.is_initial() {
501-
write!(fmt, ", iteration = {iteration_count:?}")?;
502+
write!(fmt, ", iteration = {iteration_count}")?;
502503
}
503504
write!(fmt, ")")?;
504505
}
@@ -517,7 +518,7 @@ impl fmt::Display for Backtrace {
517518
}
518519
write!(
519520
fmt,
520-
"{:?} -> {:?}",
521+
"{:?} -> {}",
521522
head.database_key_index, head.iteration_count
522523
)?;
523524
}

src/cycle.rs

Lines changed: 193 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
use thin_vec::{thin_vec, ThinVec};
5353

5454
use crate::key::DatabaseKeyIndex;
55+
use crate::sync::atomic::{AtomicBool, AtomicU8, Ordering};
5556
use crate::sync::OnceLock;
5657
use crate::Revision;
5758

@@ -96,14 +97,26 @@ pub enum CycleRecoveryStrategy {
9697
/// would be the cycle head. It returns an "initial value" when the cycle is encountered (if
9798
/// fixpoint iteration is enabled for that query), and then is responsible for re-iterating the
9899
/// cycle until it converges.
99-
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
100+
#[derive(Debug)]
100101
#[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))]
101102
pub struct CycleHead {
102103
pub(crate) database_key_index: DatabaseKeyIndex,
103-
pub(crate) iteration_count: IterationCount,
104+
pub(crate) iteration_count: AtomicIterationCount,
105+
#[cfg_attr(feature = "persistence", serde(skip))]
106+
removed: AtomicBool,
107+
}
108+
109+
impl Clone for CycleHead {
110+
fn clone(&self) -> Self {
111+
Self {
112+
database_key_index: self.database_key_index,
113+
iteration_count: self.iteration_count.load().into(),
114+
removed: self.removed.load(Ordering::Relaxed).into(),
115+
}
116+
}
104117
}
105118

106-
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
119+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default, PartialOrd, Ord)]
107120
#[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))]
108121
#[cfg_attr(feature = "persistence", serde(transparent))]
109122
pub struct IterationCount(u8);
@@ -131,11 +144,65 @@ impl IterationCount {
131144
}
132145
}
133146

147+
impl std::fmt::Display for IterationCount {
148+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149+
write!(f, "iteration={}", self.0)
150+
}
151+
}
152+
153+
#[derive(Debug)]
154+
pub(crate) struct AtomicIterationCount(AtomicU8);
155+
156+
impl AtomicIterationCount {
157+
pub(crate) fn load(&self) -> IterationCount {
158+
IterationCount(self.0.load(Ordering::Relaxed))
159+
}
160+
161+
pub(crate) fn store(&self, value: IterationCount) {
162+
self.0.store(value.0, Ordering::Release);
163+
}
164+
165+
pub(crate) fn store_mut(&mut self, value: IterationCount) {
166+
*self.0.get_mut() = value.0;
167+
}
168+
}
169+
170+
impl std::fmt::Display for AtomicIterationCount {
171+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172+
self.load().fmt(f)
173+
}
174+
}
175+
176+
impl From<IterationCount> for AtomicIterationCount {
177+
fn from(iteration_count: IterationCount) -> Self {
178+
AtomicIterationCount(iteration_count.0.into())
179+
}
180+
}
181+
182+
#[cfg(feature = "persistence")]
183+
impl serde::Serialize for AtomicIterationCount {
184+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
185+
where
186+
S: serde::Serializer,
187+
{
188+
self.load().serialize(serializer)
189+
}
190+
}
191+
192+
#[cfg(feature = "persistence")]
193+
impl<'de> serde::Deserialize<'de> for AtomicIterationCount {
194+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
195+
where
196+
D: serde::Deserializer<'de>,
197+
{
198+
IterationCount::deserialize(deserializer).map(Into::into)
199+
}
200+
}
201+
134202
/// Any provisional value generated by any query in a cycle will track the cycle head(s) (can be
135203
/// plural in case of nested cycles) representing the cycles it is part of, and the current
136204
/// iteration count for each cycle head. This struct tracks these cycle heads.
137205
#[derive(Clone, Debug, Default)]
138-
#[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))]
139206
pub struct CycleHeads(ThinVec<CycleHead>);
140207

141208
impl CycleHeads {
@@ -146,30 +213,34 @@ impl CycleHeads {
146213
pub(crate) fn initial(database_key_index: DatabaseKeyIndex) -> Self {
147214
Self(thin_vec![CycleHead {
148215
database_key_index,
149-
iteration_count: IterationCount::initial(),
216+
iteration_count: IterationCount::initial().into(),
217+
removed: false.into()
150218
}])
151219
}
152220

153-
pub(crate) fn iter(&self) -> std::slice::Iter<'_, CycleHead> {
154-
self.0.iter()
221+
pub(crate) fn iter(&self) -> CycleHeadsIterator<'_> {
222+
CycleHeadsIterator {
223+
inner: self.0.iter(),
224+
}
155225
}
156226

157227
pub(crate) fn contains(&self, value: &DatabaseKeyIndex) -> bool {
158228
self.into_iter()
159229
.any(|head| head.database_key_index == *value)
160230
}
161231

162-
pub(crate) fn remove(&mut self, value: &DatabaseKeyIndex) -> bool {
163-
let found = self
164-
.0
165-
.iter()
166-
.position(|&head| head.database_key_index == *value);
167-
let Some(found) = found else { return false };
168-
self.0.swap_remove(found);
169-
true
232+
pub(crate) fn clear_except(&self, except: DatabaseKeyIndex) {
233+
for head in self.0.iter() {
234+
if head.database_key_index == except {
235+
continue;
236+
}
237+
238+
// TODO: verify ordering
239+
head.removed.store(true, Ordering::Release);
240+
}
170241
}
171242

172-
pub(crate) fn update_iteration_count(
243+
pub(crate) fn update_iteration_count_mut(
173244
&mut self,
174245
cycle_head_index: DatabaseKeyIndex,
175246
new_iteration_count: IterationCount,
@@ -179,7 +250,21 @@ impl CycleHeads {
179250
.iter_mut()
180251
.find(|cycle_head| cycle_head.database_key_index == cycle_head_index)
181252
{
182-
cycle_head.iteration_count = new_iteration_count;
253+
cycle_head.iteration_count.store_mut(new_iteration_count);
254+
}
255+
}
256+
257+
pub(crate) fn update_iteration_count(
258+
&self,
259+
cycle_head_index: DatabaseKeyIndex,
260+
new_iteration_count: IterationCount,
261+
) {
262+
if let Some(cycle_head) = self
263+
.0
264+
.iter()
265+
.find(|cycle_head| cycle_head.database_key_index == cycle_head_index)
266+
{
267+
cycle_head.iteration_count.store(new_iteration_count);
183268
}
184269
}
185270

@@ -188,15 +273,41 @@ impl CycleHeads {
188273
self.0.reserve(other.0.len());
189274

190275
for head in other {
191-
if let Some(existing) = self
192-
.0
193-
.iter()
194-
.find(|candidate| candidate.database_key_index == head.database_key_index)
195-
{
196-
assert_eq!(existing.iteration_count, head.iteration_count);
276+
self.insert(head);
277+
}
278+
}
279+
280+
pub(crate) fn insert(&mut self, head: &CycleHead) -> bool {
281+
if let Some(existing) = self
282+
.0
283+
.iter_mut()
284+
.find(|candidate| candidate.database_key_index == head.database_key_index)
285+
{
286+
let removed = existing.removed.get_mut();
287+
288+
if *removed {
289+
*removed = false;
290+
291+
true
197292
} else {
198-
self.0.push(*head);
293+
let existing_count = existing.iteration_count.load();
294+
let head_count = head.iteration_count.load();
295+
296+
// It's now possible that a query can depend on different iteration counts of the same query
297+
// This because some queries (inner) read the provisional value of the last iteration
298+
// while outer queries read the value from the last iteration (which is i+1 if the head didn't converge).
299+
assert_eq!(
300+
existing_count, head_count,
301+
"Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {head_count:?})",
302+
existing.database_key_index
303+
);
304+
305+
false
199306
}
307+
} else {
308+
debug_assert!(!head.removed.load(Ordering::Relaxed));
309+
self.0.push(head.clone());
310+
true
200311
}
201312
}
202313

@@ -206,6 +317,37 @@ impl CycleHeads {
206317
}
207318
}
208319

320+
#[cfg(feature = "persistence")]
321+
impl serde::Serialize for CycleHeads {
322+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
323+
where
324+
S: serde::Serializer,
325+
{
326+
use serde::ser::SerializeSeq;
327+
328+
let mut seq = serializer.serialize_seq(None)?;
329+
for e in self {
330+
if e.removed.load(Ordering::Relaxed) {
331+
continue;
332+
}
333+
334+
seq.serialize_element(e)?;
335+
}
336+
seq.end()
337+
}
338+
}
339+
340+
#[cfg(feature = "persistence")]
341+
impl<'de> serde::Deserialize<'de> for CycleHeads {
342+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
343+
where
344+
D: serde::Deserializer<'de>,
345+
{
346+
let vec: ThinVec<CycleHead> = serde::Deserialize::deserialize(deserializer)?;
347+
Ok(CycleHeads(vec))
348+
}
349+
}
350+
209351
impl IntoIterator for CycleHeads {
210352
type Item = CycleHead;
211353
type IntoIter = <ThinVec<Self::Item> as IntoIterator>::IntoIter;
@@ -215,9 +357,29 @@ impl IntoIterator for CycleHeads {
215357
}
216358
}
217359

360+
pub struct CycleHeadsIterator<'a> {
361+
inner: std::slice::Iter<'a, CycleHead>,
362+
}
363+
364+
impl<'a> Iterator for CycleHeadsIterator<'a> {
365+
type Item = &'a CycleHead;
366+
367+
fn next(&mut self) -> Option<Self::Item> {
368+
loop {
369+
let next = self.inner.next()?;
370+
371+
if next.removed.load(Ordering::Relaxed) {
372+
continue;
373+
}
374+
375+
return Some(next);
376+
}
377+
}
378+
}
379+
218380
impl<'a> std::iter::IntoIterator for &'a CycleHeads {
219381
type Item = &'a CycleHead;
220-
type IntoIter = std::slice::Iter<'a, CycleHead>;
382+
type IntoIter = CycleHeadsIterator<'a>;
221383

222384
fn into_iter(self) -> Self::IntoIter {
223385
self.iter()
@@ -241,28 +403,22 @@ pub enum ProvisionalStatus {
241403
Provisional {
242404
iteration: IterationCount,
243405
verified_at: Revision,
406+
nested: bool,
244407
},
245408
Final {
246409
iteration: IterationCount,
247410
verified_at: Revision,
411+
nested: bool,
248412
},
249413
FallbackImmediate,
250414
}
251415

252416
impl ProvisionalStatus {
253-
pub(crate) const fn iteration(&self) -> Option<IterationCount> {
254-
match self {
255-
ProvisionalStatus::Provisional { iteration, .. } => Some(*iteration),
256-
ProvisionalStatus::Final { iteration, .. } => Some(*iteration),
257-
ProvisionalStatus::FallbackImmediate => None,
258-
}
259-
}
260-
261-
pub(crate) const fn verified_at(&self) -> Option<Revision> {
417+
pub(crate) fn nested(&self) -> bool {
262418
match self {
263-
ProvisionalStatus::Provisional { verified_at, .. } => Some(*verified_at),
264-
ProvisionalStatus::Final { verified_at, .. } => Some(*verified_at),
265-
ProvisionalStatus::FallbackImmediate => None,
419+
ProvisionalStatus::Provisional { nested, .. } => *nested,
420+
ProvisionalStatus::Final { nested, .. } => *nested,
421+
ProvisionalStatus::FallbackImmediate => false,
266422
}
267423
}
268424
}

0 commit comments

Comments
 (0)