consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896
consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896wen-coding wants to merge 40 commits intomainfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2896 +/- ##
===========================================
+ Coverage 58.12% 77.16% +19.03%
===========================================
Files 2109 20 -2089
Lines 173381 1747 -171634
===========================================
- Hits 100780 1348 -99432
+ Misses 63659 259 -63400
+ Partials 8942 140 -8802
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
ebf93df to
f4a9c1e
Compare
Extract generic A/B file persistence into a reusable consensus/persist/ sub-package and add block-file persistence for crash-safe availability state recovery. Changes: - Move persist.go and persist_test.go into consensus/persist/ (git mv to preserve history), exporting Persister, NewPersister, WriteAndSync, SuffixA, SuffixB. - Add persist/blocks.go: per-block file persistence using <lane_hex>_<blocknum>.pb files in a blocks/ subdirectory, with load, delete-before, and header-mismatch validation. - Wire avail.NewState to accept stateDir, create A/B persister for AppQC and BlockPersister for signed lane proposals, and restore both on restart (contiguous block runs, queue alignment). - Update avail/state.go to persist AppQC on prune and delete obsolete block files after each AppQC advance. - Thread PersistentStateDir from consensus.Config through to avail.NewState. - Expand consensus/inner.go doc comment with full persistence design (what, why, recovery, write behavior, rebroadcasting). - Move TestRunOutputsPersistErrorPropagates to consensus/inner_test.go for proper package alignment. - Add comprehensive tests for blocks persistence (empty dir, multi-lane, corrupt/mismatched skip, DeleteBefore, filename roundtrip). Ref: sei-protocol/sei-v3#512 Co-authored-by: Cursor <cursoragent@cursor.com>
Move persisted data loading (AppQC deserialization and block loading) into a dedicated function for readability. Co-authored-by: Cursor <cursoragent@cursor.com>
Move block sorting, contiguous-prefix extraction, and gap truncation from avail/inner.go into persist/blocks.go so all disk-recovery logic lives in one place. This isolates storage concerns in the persistence layer, simplifying newInner and preparing for a future storage backend swap. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
PushBlock and ProduceBlock now add blocks to the in-memory queue immediately and send a persist job to a background goroutine via a buffered channel. The background writer fsyncs each block to disk and advances a per-lane blockPersisted cursor under the inner lock. RecvBatch gates on this cursor so votes are only signed for blocks that have been durably written to disk. When persistence is disabled (testing), the cursor is nil and RecvBatch falls back to bq.next. Co-authored-by: Cursor <cursoragent@cursor.com>
newInner no longer takes a separate persistEnabled bool; loaded != nil already implies persistence is enabled. Tests with loaded data now correctly reflect this. Co-authored-by: Cursor <cursoragent@cursor.com>
blockPersisted is reconstructed from disk on restart, not persisted itself. Move its creation to just above the block restoration loop (past the loaded==nil early return) so the code reads top-down. Co-authored-by: Cursor <cursoragent@cursor.com>
05beddb to
2f0bbad
Compare
Co-authored-by: Cursor <cursoragent@cursor.com>
Move persistCh, persistJob, and the writer loop from avail/State into BlockPersister.Queue + BlockPersister.Run, so callers just call Queue() and the persist layer owns the channel, buffer sizing, and drain loop. Queue blocks with context to avoid holes in the sequential blockPersisted cursor (which would permanently stall voting). Call sites use utils.IgnoreAfterCancel to swallow shutdown errors. Co-authored-by: Cursor <cursoragent@cursor.com>
…l/sei-chain into wen/persist_appqc_and_blocks
Remove redundant loop that explicitly zeroed every lane. Map zero-values handle lanes without loaded blocks; only lanes with blocks on disk need an explicit write. Add comment explaining why starting at 0 is safe. Co-authored-by: Cursor <cursoragent@cursor.com>
- Add comment explaining why votes are not persisted and why the votes queue must be advanced past loaded blocks on restart. - Consolidate redundant tests: fold blockPersisted assertions into existing tests, remove TestNewInnerLoadedBlocksContiguousPrefix. - Add test that headers() returns ErrPruned for blocks before the loaded range (verifies votes queue advancement prevents hangs). Co-authored-by: Cursor <cursoragent@cursor.com>
Replace require.Contains(err.Error(), "...") with require.Error(err). Callers don't branch on specific error messages, so string matching adds no value; the test name already documents what is being rejected. Co-authored-by: Cursor <cursoragent@cursor.com>
BlockPersister now owns the per-lane contiguous persistence cursor and passes the exclusive upper bound to the callback. The caller no longer needs to compute n+1 or guard against out-of-order completion. This localizes the ordering assumption (FIFO queue) inside BlockPersister, so switching to parallel storage only requires changing BlockPersister.Run. Co-authored-by: Cursor <cursoragent@cursor.com>
Also add TODO for retry on persistence failure. Co-authored-by: Cursor <cursoragent@cursor.com>
a6a3e19 to
1581831
Compare
- Add appQCSend AtomicSend to publish latestAppQC updates. - PushAppVote/PushAppQC store to appQCSend instead of calling persistAppQC under the lock. - Run() spawns appQCPersist goroutine that watches and persists asynchronously, matching the pattern used for block tips. - Remove persistAppQC method. - Use q.pushBack() instead of direct queue field manipulation. - Update persistQueueSize comment for mixed job types. Co-authored-by: Cursor <cursoragent@cursor.com>
BlockPersister no longer owns a channel or Run loop. Instead, a single goroutine in state.go watches inner.blocks for new entries, collects a batch under the lock, then calls bp.PersistBatch (no lock held) which persists blocks, updates tips, and cleans up old files. This removes backpressure on PushBlock/produceBlock — they just add to inner.blocks and return. Cleanup (DeleteBefore) is driven by the persist goroutine observing laneFirsts, removing explicit QueueDeleteBefore calls from PushAppVote/PushAppQC. Also: use q.pushBack() instead of direct queue field writes, remove dead Tips() method, simplify Queue drop comments. Co-authored-by: Cursor <cursoragent@cursor.com>
On restart, the node previously lost all CommitQCs between the last AppQC and the current head, forcing it to wait for re-gossip before fullCommitQC could push to DataState. This adds async CommitQC persistence alongside the existing block persistence: - New CommitQCPersister (persist/commitqcs.go): individual files per road index, same load/sort/truncate-at-gap pattern as BlockPersister. - Restore loaded CommitQCs into inner.commitQCs queue on restart, filtering by the AppQC-derived first index. - Extend the persist goroutine to collect and persist new CommitQCs alongside blocks, with lazy cleanup of old files. Co-authored-by: Cursor <cursoragent@cursor.com>
Restructure newInner to load all persisted data (commitQCs, blocks) first, then call inner.prune(appQC, commitQC) at the end — same code path as runtime. This eliminates manual queue.first/next assignments and ensures init and normal operation use identical pruning logic. newInner now returns an error: if the persisted AppQC has no matching commitQC on disk, the state is corrupt (the AppQC is always persisted before old commitQC files are eligible for deletion). Co-authored-by: Cursor <cursoragent@cursor.com>
Between persist iterations (lock released), prune can delete map entries below q.first. Without clamping, the persist goroutine reads nil from the map, causing a nil pointer dereference. Clamp blockCur and commitQCCur to q.first before collecting batches. Add TestStateWithPersistence (5 iterations) that reliably triggers the crash without the fix. Co-authored-by: Cursor <cursoragent@cursor.com>
prune() on an empty queue works but reads as "deleting entries" when the intent is "set starting position." Add reset(start) for clarity and replace the three prune() calls in newInner that operate on freshly created queues. Add unit tests for queue. Co-authored-by: Cursor <cursoragent@cursor.com>
Extract the inline persist closure into two methods for readability. Move latestCommitQC.Store into the commitQC loading loop in newInner. Remove dead blockNum variable in produceBlock; use q.next directly. Return clamped commitQCCur via persistBatch to prevent busy-loop when prune empties the queue (value type not visible to caller otherwise). Co-authored-by: Cursor <cursoragent@cursor.com>
| for lane, next := range blockCur { | ||
| inner.nextBlockToPersist[lane] = next | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, q := range inner.blocks { | ||
| if blockCur[lane] < q.next { | ||
| return true | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, q := range inner.blocks { | ||
| // Clamp cursor: prune may have deleted entries below q.first | ||
| // between iterations while the lock was not held. | ||
| blockCur[lane] = max(blockCur[lane], q.first) | ||
| for n := blockCur[lane]; n < q.next; n++ { | ||
| b.blocks = append(b.blocks, q.q[n]) | ||
| } | ||
| b.laneFirsts[lane] = q.first | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| if len(l.commitQCs) > 0 { | ||
| i.commitQCs.reset(l.commitQCs[0].Index) | ||
| for _, lqc := range l.commitQCs { | ||
| if lqc.Index == i.commitQCs.next { |
There was a problem hiding this comment.
return error in case loaded state is not a list of consecutive commitQCs.
| return uint64(q.next) - uint64(q.first) | ||
| } | ||
|
|
||
| // reset sets the starting position of an empty queue. |
There was a problem hiding this comment.
nit: you should just recreate the queue if you want to reset the state. Or you can call prune(start) if you want to move 0 -> start.
| inner utils.Watch[*inner] | ||
|
|
||
| // persister writes avail inner state to disk using A/B files; None when persistence is disabled. | ||
| persister utils.Option[persist.Persister[*apb.AppQC]] |
There was a problem hiding this comment.
nit: group the persisters under a single Option. It would be kind of weird to allow for an arbitrary subset of persisters.
| const innerFile = "avail_inner" | ||
|
|
||
| // loadPersistedState loads persisted avail state from disk and creates persisters for ongoing writes. | ||
| func loadPersistedState(dir string) (*loadedAvailState, persist.Persister[*apb.AppQC], *persist.BlockPersister, *persist.CommitQCPersister, error) { |
There was a problem hiding this comment.
oof, this result type deserves a struct. It will combine nicely with the previous comment.
| return nil | ||
| } | ||
| updated, err := inner.prune(appQC, qc) | ||
| laneFirsts, err := inner.prune(appQC, qc) |
There was a problem hiding this comment.
ugh, so the laneFirsts are not always used (wasted computation), and now we check for non-nil map (which are proper empty maps). Perhaps it was not such a good idea :/ . Up to you, if you want to revert it to the previous version.
| if bp, ok := s.blockPersist.Get(); ok { | ||
| cp, _ := s.commitQCPersist.Get() | ||
| scope.SpawnNamed("persist", func() error { | ||
| return s.persistLoop(ctx, bp, cp) |
There was a problem hiding this comment.
once you get all persisterts into a single struct, it can have its own Run() method to encapsulate these.
| func (s *State) persistLoop(ctx context.Context, bp *persist.BlockPersister, cp *persist.CommitQCPersister) error { | ||
| blockCur := bp.LoadTips() | ||
| var commitQCCur types.RoadIndex | ||
| if cp != nil { |
There was a problem hiding this comment.
either assume cp is non-nil, or make it Optional
|
|
||
| // Update nextBlockToPersist under lock. | ||
| if len(batch.blocks) > 0 { | ||
| for inner, ctrl := range s.inner.Lock() { |
There was a problem hiding this comment.
nit: I would make it a method of State, MarkAsPersisted, or similar.
| return true | ||
| } | ||
| } | ||
| return cp != nil && commitQCCur < inner.commitQCs.next |
| // collectPersistBatch waits for new blocks or commitQCs and collects them under lock. | ||
| func (s *State) collectPersistBatch( | ||
| ctx context.Context, | ||
| cp *persist.CommitQCPersister, |
There was a problem hiding this comment.
cp is only passed to check whether commitQCs update should also be considered. This should be a given always (see comments above)
| if !ok { | ||
| return nil | ||
| } | ||
| if err := p.Persist(types.AppQCConv.Encode(appQC)); err != nil { |
There was a problem hiding this comment.
If I understand correctly, it is possible that appQC persistance will outdistance commitQCs persistance, in which case we will end up with inconsistent state (AppQC for a future CommitQC).
| if len(batch.blocks) > 0 { | ||
| for inner, ctrl := range s.inner.Lock() { | ||
| for lane, next := range blockCur { | ||
| inner.nextBlockToPersist[lane] = next |
There was a problem hiding this comment.
we need a similar mechanism for commitQCs - in particular consensus.State should not start voting for height H+1 until commitQC for H is persisted. Otherwise we can lose commitQCs.
| commitQCCur types.RoadIndex // clamped cursor (may have advanced past pruned entries) | ||
| } | ||
|
|
||
| func (s *State) persistLoop(ctx context.Context, bp *persist.BlockPersister, cp *persist.CommitQCPersister) error { |
There was a problem hiding this comment.
this loop looks very similar to lanevote subscription (which is intentional). Perhaps we could deduplicate this logic in an elegant way? (this is not a request for change, it is just a thought).
pompon0
left a comment
There was a problem hiding this comment.
The updated version looks very nice! Good work! There are still a few missing pieces. I've left comments.
Summary
Crash-safe persistence for availability state (AppQC, signed lane proposals, and CommitQCs). All I/O is fully asynchronous — no disk operations on the critical path or under locks.
consensus/persist/sub-package: Generic A/B file persistence (Persister[T]) with crash-safe alternating writes.BlockPersistermanages per-block files in ablocks/subdirectory.CommitQCPersistermanages per-road-index CommitQC files.Persister[T proto.Message]interface: Strongly-typed persistence API; concreteabPersister[T]handles A/B file strategy. A/B suffixes are unexported;WriteRawFilehelper for corruption tests.persist/blocks.go): Each signed lane proposal stored as<lane_hex>_<blocknum>.pb. On load, blocks are sorted and truncated at the first gap.PersistBatchencapsulates the I/O: persist blocks, update tips, clean up old files.persist/commitqcs.go): Each CommitQC stored ascommitqc_<roadindex>.pb. On load, QCs are sorted and truncated at the first gap. Needed for reconstructing FullCommitQCs on restart.persistLoop) watchesinner.blocksandinner.commitQCsdirectly (the in-memory state is the queue).collectPersistBatchacquires the lock, waits for new data, clamps cursors past pruned entries, and collects the batch. I/O runs with no lock held. No channel, no backpressure.prunecan delete map entries. Cursors are clamped toq.firstbefore reading to prevent nil pointer dereference. Regression test (TestStateWithPersistence) reliably catches this.latestAppQCis published viaAtomicSend; a separate goroutine subscribes and persists asynchronously. No I/O under the inner lock.avail/state.go):NewStateacceptsstateDir, initialises both the A/B persister (for AppQC),BlockPersister, andCommitQCPersister, and loads persisted data on restart.avail/inner.go): Usesqueue.reset()to set starting positions, thenpushBackto reload entries. Finally callsinner.prune()with the persisted AppQC to advance all queues — same code path as runtime. Returns error for corrupt state (AppQC without matching CommitQC on disk).avail/subscriptions.go):RecvBatchonly yields blocks below thenextBlockToPersistwatermark, so votes are only signed for durably written blocks.DeleteBeforeremoves files for pruned blocks and orphaned lanes (from previous committees). Driven by the persist goroutine observinglaneFirsts.prune()returns lane firsts map: Eliminates the separate snapshot helper; nil return means no pruning occurred.newState()accepts a customPersisterfor test mocks, avoiding fragile field mutation after construction.queue.reset()method: Clearly sets the starting position of an empty queue, replacing misleadingprune()calls during initialization.Ref: sei-protocol/sei-v3#512
Test plan
persist/blocks_test.go: load/store, gap truncation, DeleteBefore, orphaned lane cleanup, header mismatch, corrupt filespersist/commitqcs_test.go: load/store, gap truncation, DeleteBefore, corrupt filespersist/persist_test.go: A/B file crash safety, seq management, corrupt fallbackavail/state_test.go: fresh start, load AppQC, load blocks, load both, load commitQCs, load commitQCs with AppQC, corrupt data, headers returns ErrPruned for blocks before loaded rangeavail/state_test.go(TestStateWithPersistence): end-to-end persist + prune race regression test (5 iterations with disk persistence; reliably catches cursor race without the clamp fix)avail/inner_test.go: newInner with loaded state, newInner with all three (AppQC + CommitQCs + blocks), newInner error cases (AppQC without matching CommitQC), nextBlockToPersist reconstruction, votes queue advancementavail/queue_test.go: newQueue, pushBack, reset, prune, stale prune, prune past nextconsensus/inner_test.go: consensus inner persistence round-trip, persist error propagation via newState injectiondata/state_test.go: data state teststypes/proposal_test.go: proposal verification tests