feat(comms): durable per-parent outbox + drain-on-Stop/heartbeat (replaces push-into-tmux)#1226
Conversation
…urn_fingerprint + dead-letter (#1225) Step 1 of the durable per-parent outbox redesign. Adds the producer-side primitives the redirect will use, with no behavior change yet: - TurnFingerprint: coarse per-child-per-turn key for exactly-once consumer effects, stable across daemon-restart re-stamps (distinct from the emit-instant EventFingerprint). - CommitToInbox: last-wins-per-child atomic commit — at most one pending record per child, killing the busy-retry flood at the source. - DeadLetterSink: bounds the dropped_no_target ~1/sec runaway to a terminal state, logged exactly once after MaxUnresolvedAttempts. - TransitionNotificationEvent gains turn_fingerprint + attempts (additive, omitempty — persistence parity preserved). Tests (issue1225_outbox_producer_test.go): last-wins, distinct-children, turn_fingerprint stability/distinctness, dead-letter log-once-no-runaway. Committed by Ashesh Goplani
Step 2. The parent pulls from its durable outbox at its own cadence: - DrainInboxForParent: atomic read+truncate gate (concurrent drains partition records — no double-ack, no loss), collapse last-wins per child, skip turn_fingerprints already consumed (exactly-once EFFECTS across re-delivery), persist a TTL-bounded consumed-turns ledger per parent. - ForgetConsumedTurnsForChild: ledger cleanup hook for rm_sweep. - CLI: 'agent-deck inbox drain [--json] <self>' — the conductor heartbeat's first step. Legacy 'inbox <id>' raw form preserved. Tests: exactly-once across re-delivery, last-wins within a drain, concurrent drain (race) no-double/no-loss, CLI heartbeat-then-empty, --json shape, empty-inbox boundary, legacy back-compat. Committed by Ashesh Goplani
…rd (#1225) Step 3 — the busy-parent fix. When a parent finishes its turn, the real hook-handler Stop edge drains the durable outbox and emits {decision:"block",reason:<completions>}, injecting pending child completions as the parent's next turn input — at the moment it is provably free. A busy conductor therefore receives every completion at its next turn boundary, with zero forced interrupts and zero loss. - DrainForStopHook: drains + formats the injection, guarded by a max-consecutive-block budget (MaxStopHookBlocks) keyed on Claude's stop_hook_active flag, so a chatty child cannot trap the conductor in an infinite Stop->block loop (Agent Teams #47930 token-burn failure). On budget exhaustion it stops blocking WITHOUT draining, so pending records are preserved for the heartbeat (never lost to the guard). - hook-handler emits the decision on the Stop edge; harmless under the legacy async install, activates once the conductor Stop hook is flipped to sync (flagged for maintainer). Tests: unit (busy-parent exactly-once, max-blocks loop guard, guard-trip preserves records) + the HEADLINE capability proof driving the real compiled hook-handler end-to-end (the exact 2026-05-29 failure scenario). Committed by Ashesh Goplani
…achinery (#1225) Steps 4-5 — flip delivery from push to pull and delete the dead push path. Producers (unified onto ONE per-parent inbox): - NotifyTransition / NotifyFinished (interactive running->waiting) and DeliverCompletion (one-shot run-task kernel-exit) now commit to the durable outbox via commitEventToInbox instead of gating on an idle parent. A busy conductor is no longer missed — delivery doesn't depend on an idle window. - Removed ack-on-defer (taskworker.go:301): a one-shot completion is acked ONLY once provably committed to the durable inbox, never when it merely reached a volatile queue. DeliverCompletion returns true only on durable commit. - Daemon ReplayUnackedCompletions gained a bounded dead-letter budget: an unresolvable completion is dead-lettered + acked after MaxUnresolvedAttempts polls instead of replaying ~1/sec forever (the dropped_no_target runaway). Removed the dead push machinery (no production caller remains): prepareDispatch, dispatchAsync, scheduleBusyRetry, EnqueueDeferred/DrainRetryQueue, the deferred queue + busy-backoff + per-target send slots + terminated-fingerprint set, and their now-unused struct fields/consts. NotifyTransition's busy-gate (parent==StatusRunning -> deferred_target_busy) is gone. Close()/Flush() are now no-ops (delivery is a synchronous commit). SendSessionMessageReliable retained. rm_sweep: on child removal, also sweep the dead-letter record, the consumed-turn ledger entries, and the Stop-hook block budget — not just inbox lines. Wake-nudge (Tier-2 latency, gated): WakeNudger fires one debounced send-keys into an IDLE parent's pane; never a busy one; a dropped nudge is harmless. The inotify trigger glue is flagged for maintainer. Tests: deleted the busy-defer/async/deferred-queue tests (behavior removed); rewrote NotifyFinished/DeliverCompletion/ReplayUnacked/EmitDoneSignals to assert commit-to-inbox; repurposed the busy-enqueue test to prove a busy parent is now committed (case 10); added unified-producers, durability-across-restart, rm-cleanup, and wake-nudge tests. Committed by Ashesh Goplani
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (20)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughDurable per-parent JSONL inbox/outbox replaces push delivery: producers commit last-wins events with turn-fingerprints; parents drain via Stop-hook and heartbeat using a consumed-turn ledger for exactly-once effects. Adds dead-lettering, stop-block budget, wake-nudge, CLI drain, notifier refactor to synchronous commits, and comprehensive tests. ChangesIssue
Possibly Related PRs
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (1)
internal/session/inbox_consumer.go (1)
117-121: 💤 Low valueDocument partial-success semantics for callers.
When
saveConsumedTurnsLockedfails, the inbox is already truncated but the consumed fingerprints aren't persisted. Returning(out, err)is correct to avoid data loss, but callers must understand that if they act onoutwhile ignoring the error, a future re-emit of the same turn may not be deduplicated. Consider adding a brief comment here clarifying that callers should log/propagate the error even though they process the events.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/session/inbox_consumer.go` around lines 117 - 121, The truncation + persistence call can fail: when dirty is true and saveConsumedTurnsLocked(parentID, consumed) returns an error the inbox has already been truncated but consumed fingerprints were not saved, so callers that proceed with out risk future duplicate re-emits not being deduplicated; add a concise inline comment at the call site (near the if dirty { ... } block) documenting this partial-success semantic and instructing callers to log or propagate the error and not silently ignore it if they act on out, so downstream code knows it must treat out as provisional until persistence succeeds.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/session/inbox_outbox.go`:
- Around line 180-203: RecordUnresolvable currently marks s.logged[child]=true
and returns true (allowing ack) before confirming durable persistence because
writeDeadLetter(event) is ignored; change the flow so you call
writeDeadLetter(event) and check its error first, only set s.logged[child]=true
and return true if writeDeadLetter succeeded; if it fails, do not mark logged
and return false (or propagate the error), ensuring any shared-state updates to
s.logged and s.attempts remain protected by s.mu around the mutation and that
event.Attempts is still set appropriately (n) only after a successful write;
locate this logic in the RecordUnresolvable function and update the ordering and
error handling for writeDeadLetter, s.logged[child], and the returned boolean.
In `@internal/session/inbox_stophook.go`:
- Around line 90-117: The code uses a process-local mutex (stopBlockMu) while
the stop-block counter is persisted, so concurrent handlers in different
processes can race; change the read-modify-write sequence around
loadStopBlockCountLocked/saveStopBlockCountLocked to use an inter-process lock
(e.g., a per-instance file lock or a persistent CAS/transaction) rather than
stopBlockMu, and hold that inter-process lock for the entire sequence that reads
count, applies the "fresh user turn" reset, checks against MaxStopHookBlocks,
invokes DrainInboxForParent if needed, and then writes the updated count; update
references in the same code block (functions loadStopBlockCountLocked,
saveStopBlockCountLocked, DrainInboxForParent, MaxStopHookBlocks, and
StopHookDecision) so the logic is atomic across processes.
In `@internal/session/issue1225_outbox_consumer_test.go`:
- Around line 114-126: The goroutines calling DrainInboxForParent currently
swallow errors which can hide race bugs; modify the test so each goroutine sends
any non-nil error to a shared error channel (or appends to a protected errors
slice) instead of returning silently, then after wg.Wait() drain the channel and
fail the test (e.g., t.Fatalf or require.NoError) if any error was reported;
reference the goroutine that calls DrainInboxForParent(parent), the wg
synchronization, and the seen map/mu so you place the error reporting alongside
the existing mu.Lock/Unlock logic and verify errors after waiting.
- Around line 70-90: The test TestIssue1225_DrainInbox_LastWinsWithinDrain
currently commits one record per distinct child so it never exercises
"last-wins" collapse; change the test to CommitToInbox multiple
TransitionNotificationEvent records for the same ChildSessionID (e.g., call
CommitToInbox twice for "child-1" with different Timestamp/ToStatus values) and
then call DrainInboxForParent and assert that only the latest record for that
child is returned (verify len(got)==1 and the delivered record matches the last
committed event); keep CommitToInbox and DrainInboxForParent as the interaction
points to modify.
In `@internal/session/issue1225_outbox_producer_test.go`:
- Around line 19-26: The test helper inboxTestHome sets HOME and other env vars
but doesn't clear the user-config cache, so stale config can bleed between
tests; update inboxTestHome to call the user-config cache reset in addition to
ResetInboxFingerprintCacheForTest() (e.g., invoke ResetUserConfigCacheForTest()
or the appropriate ResetUserConfig... function) immediately after setting the
env vars so the config cache is cleared for each test run.
In `@internal/session/issue1225_unify_durability_test.go`:
- Around line 98-99: The call to DrainInboxForParent currently discards its
error return; change the call to capture both returns (e.g., again, err :=
DrainInboxForParent(parentID)), then assert err == nil (e.g., t.Fatalf("drain
error: %v", err) if err != nil) before asserting len(again) == 0, so any drain
failure is reported instead of hidden; reference DrainInboxForParent and
parentID in the fix.
In `@internal/session/rm_sweep.go`:
- Around line 52-63: The current early return when sweepInboxFilesForChild(dir,
entries, childSessionID) returns an error prevents the subsequent cleanup calls
(DeadLetterPathFor(childSessionID), ForgetConsumedTurnsForChild(childSessionID),
ResetStopBlockBudget(childSessionID)) from running; change the control flow to
always perform those three cleanup actions regardless of the sweep error: call
sweepInboxFilesForChild and capture totalDropped and err, then unconditionally
run the dead-letter remove and the two Forget/Reset calls, and finally return
totalDropped and the original err (if any) so cleanup runs even on partial sweep
failures.
---
Nitpick comments:
In `@internal/session/inbox_consumer.go`:
- Around line 117-121: The truncation + persistence call can fail: when dirty is
true and saveConsumedTurnsLocked(parentID, consumed) returns an error the inbox
has already been truncated but consumed fingerprints were not saved, so callers
that proceed with out risk future duplicate re-emits not being deduplicated; add
a concise inline comment at the call site (near the if dirty { ... } block)
documenting this partial-success semantic and instructing callers to log or
propagate the error and not silently ignore it if they act on out, so downstream
code knows it must treat out as provisional until persistence succeeds.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 89da4f2c-983e-4663-9186-a6eb55b7ada3
📒 Files selected for processing (32)
cmd/agent-deck/hook_handler.gocmd/agent-deck/inbox_cmd.gocmd/agent-deck/issue1225_inbox_drain_cli_test.gointernal/session/event_fingerprint.gointernal/session/inbox.gointernal/session/inbox_consumer.gointernal/session/inbox_nudge.gointernal/session/inbox_outbox.gointernal/session/inbox_stophook.gointernal/session/issue1186_done_emit_test.gointernal/session/issue1225_outbox_consumer_test.gointernal/session/issue1225_outbox_producer_test.gointernal/session/issue1225_rm_cleanup_test.gointernal/session/issue1225_stophook_test.gointernal/session/issue1225_unify_durability_test.gointernal/session/issue1225_wake_nudge_test.gointernal/session/issue962_no_replay_test.gointernal/session/issue962_target_busy_ttl_test.gointernal/session/issue962_v3_mute_replay_test.gointernal/session/rm_sweep.gointernal/session/rm_sweep_test.gointernal/session/taskworker.gointernal/session/taskworker_test.gointernal/session/transition_daemon.gointernal/session/transition_notifier.gointernal/session/transition_notifier_async_test.gointernal/session/transition_notifier_dedup_test.gointernal/session/transition_notifier_fdleak_test.gointernal/session/transition_notifier_inbox_test.gointernal/session/transition_notifier_queue_test.gointernal/session/transition_notifier_test.gotests/capability/issue1225_busy_parent_test.go
💤 Files with no reviewable changes (6)
- internal/session/issue962_v3_mute_replay_test.go
- internal/session/transition_notifier_async_test.go
- internal/session/transition_notifier_queue_test.go
- internal/session/transition_notifier_fdleak_test.go
- internal/session/issue962_no_replay_test.go
- internal/session/transition_notifier_test.go
| func (s *DeadLetterSink) RecordUnresolvable(event TransitionNotificationEvent) bool { | ||
| child := strings.TrimSpace(event.ChildSessionID) | ||
| if child == "" { | ||
| return false | ||
| } | ||
| s.mu.Lock() | ||
| if s.logged[child] { | ||
| s.mu.Unlock() | ||
| return false | ||
| } | ||
| s.attempts[child]++ | ||
| n := s.attempts[child] | ||
| if n < MaxUnresolvedAttempts { | ||
| s.mu.Unlock() | ||
| return false | ||
| } | ||
| s.logged[child] = true | ||
| s.mu.Unlock() | ||
|
|
||
| event.Attempts = n | ||
| _ = writeDeadLetter(event) | ||
| s.writeMissedOnce(event) | ||
| return true | ||
| } |
There was a problem hiding this comment.
Ack can occur without durable dead-letter persistence.
RecordUnresolvable returns true even when writeDeadLetter fails (_ = writeDeadLetter(event)), and it marks logged[child]=true before persistence. Downstream callers treat true as safe-to-ack, so a dead-letter write failure can silently drop the completion.
💡 Proposed fix
func (s *DeadLetterSink) RecordUnresolvable(event TransitionNotificationEvent) bool {
child := strings.TrimSpace(event.ChildSessionID)
if child == "" {
return false
}
s.mu.Lock()
- if s.logged[child] {
- s.mu.Unlock()
+ defer s.mu.Unlock()
+ if s.logged[child] {
return false
}
s.attempts[child]++
n := s.attempts[child]
if n < MaxUnresolvedAttempts {
- s.mu.Unlock()
return false
}
- s.logged[child] = true
- s.mu.Unlock()
event.Attempts = n
- _ = writeDeadLetter(event)
+ if err := writeDeadLetter(event); err != nil {
+ // Keep it retryable; do not acknowledge upstream.
+ return false
+ }
+ s.logged[child] = true
s.writeMissedOnce(event)
return true
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/inbox_outbox.go` around lines 180 - 203, RecordUnresolvable
currently marks s.logged[child]=true and returns true (allowing ack) before
confirming durable persistence because writeDeadLetter(event) is ignored; change
the flow so you call writeDeadLetter(event) and check its error first, only set
s.logged[child]=true and return true if writeDeadLetter succeeded; if it fails,
do not mark logged and return false (or propagate the error), ensuring any
shared-state updates to s.logged and s.attempts remain protected by s.mu around
the mutation and that event.Attempts is still set appropriately (n) only after a
successful write; locate this logic in the RecordUnresolvable function and
update the ordering and error handling for writeDeadLetter, s.logged[child], and
the returned boolean.
| stopBlockMu.Lock() | ||
| defer stopBlockMu.Unlock() | ||
|
|
||
| count := loadStopBlockCountLocked(instanceID) | ||
| if !stopHookActive { | ||
| // Fresh user turn: reset the consecutive-block budget. | ||
| count = 0 | ||
| } | ||
|
|
||
| // Budget exhausted: stop blocking so the conductor can reach idle. Leave any | ||
| // pending records untouched for the heartbeat to drain. | ||
| if count >= MaxStopHookBlocks { | ||
| saveStopBlockCountLocked(instanceID, count) | ||
| return StopHookDecision{}, false, nil | ||
| } | ||
|
|
||
| events, err := DrainInboxForParent(instanceID) | ||
| if err != nil { | ||
| return StopHookDecision{}, false, err | ||
| } | ||
| if len(events) == 0 { | ||
| // Nothing to inject — let the conductor go idle and reset the budget. | ||
| saveStopBlockCountLocked(instanceID, 0) | ||
| return StopHookDecision{}, false, nil | ||
| } | ||
|
|
||
| saveStopBlockCountLocked(instanceID, count+1) | ||
| return StopHookDecision{ |
There was a problem hiding this comment.
Stop-block budget updates are not synchronized across processes.
Line [90] uses stopBlockMu, but the count state is persisted on disk and updated via read-modify-write. A process-local mutex does not protect concurrent hook-handler processes for the same instance, so MaxStopHookBlocks can be bypassed under overlap.
As per coding guidelines internal/session/**: "Pay extra attention to: ... race conditions (this code runs with -race in CI)".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/inbox_stophook.go` around lines 90 - 117, The code uses a
process-local mutex (stopBlockMu) while the stop-block counter is persisted, so
concurrent handlers in different processes can race; change the
read-modify-write sequence around
loadStopBlockCountLocked/saveStopBlockCountLocked to use an inter-process lock
(e.g., a per-instance file lock or a persistent CAS/transaction) rather than
stopBlockMu, and hold that inter-process lock for the entire sequence that reads
count, applies the "fresh user turn" reset, checks against MaxStopHookBlocks,
invokes DrainInboxForParent if needed, and then writes the updated count; update
references in the same code block (functions loadStopBlockCountLocked,
saveStopBlockCountLocked, DrainInboxForParent, MaxStopHookBlocks, and
StopHookDecision) so the logic is atomic across processes.
| // Within a single drain, multiple records for one child collapse to one. | ||
| func TestIssue1225_DrainInbox_LastWinsWithinDrain(t *testing.T) { | ||
| inboxTestHome(t) | ||
| parent := "conductor-c-1777000110" | ||
| // Two distinct children, each with a single pending record after commit. | ||
| for _, c := range []string{"child-1", "child-2"} { | ||
| if err := CommitToInbox(parent, TransitionNotificationEvent{ | ||
| ChildSessionID: c, ChildTitle: c, Profile: "personal", | ||
| FromStatus: "running", ToStatus: "waiting", Timestamp: time.Now(), | ||
| }); err != nil { | ||
| t.Fatalf("commit %s: %v", c, err) | ||
| } | ||
| } | ||
| got, err := DrainInboxForParent(parent) | ||
| if err != nil { | ||
| t.Fatalf("drain: %v", err) | ||
| } | ||
| if len(got) != 2 { | ||
| t.Fatalf("expected 2 deliverables (one per child), got %d", len(got)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Test doesn't fully exercise last-wins collapse.
The test name suggests verifying last-wins semantics, but it commits one record per child (two distinct children). To properly test last-wins, commit multiple records for the same child with different timestamps and verify only the latest is returned.
🧪 Suggested addition to properly test last-wins
func TestIssue1225_DrainInbox_LastWinsWithinDrain(t *testing.T) {
inboxTestHome(t)
parent := "conductor-c-1777000110"
- // Two distinct children, each with a single pending record after commit.
- for _, c := range []string{"child-1", "child-2"} {
- if err := CommitToInbox(parent, TransitionNotificationEvent{
- ChildSessionID: c, ChildTitle: c, Profile: "personal",
- FromStatus: "running", ToStatus: "waiting", Timestamp: time.Now(),
- }); err != nil {
- t.Fatalf("commit %s: %v", c, err)
- }
+ // Commit multiple records for the same child — only the last should surface.
+ child := "child-lastwin"
+ for i, status := range []string{"waiting", "running", "waiting"} {
+ if err := CommitToInbox(parent, TransitionNotificationEvent{
+ ChildSessionID: child, ChildTitle: child, Profile: "personal",
+ FromStatus: "running", ToStatus: status,
+ LastOutputHash: fmt.Sprintf("hash-%d", i),
+ Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond),
+ }); err != nil {
+ t.Fatalf("commit %d: %v", i, err)
+ }
}
got, err := DrainInboxForParent(parent)
if err != nil {
t.Fatalf("drain: %v", err)
}
- if len(got) != 2 {
- t.Fatalf("expected 2 deliverables (one per child), got %d", len(got))
+ if len(got) != 1 {
+ t.Fatalf("expected 1 deliverable (last-wins for single child), got %d", len(got))
+ }
+ if got[0].LastOutputHash != "hash-2" {
+ t.Fatalf("expected last record (hash-2), got %s", got[0].LastOutputHash)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Within a single drain, multiple records for one child collapse to one. | |
| func TestIssue1225_DrainInbox_LastWinsWithinDrain(t *testing.T) { | |
| inboxTestHome(t) | |
| parent := "conductor-c-1777000110" | |
| // Two distinct children, each with a single pending record after commit. | |
| for _, c := range []string{"child-1", "child-2"} { | |
| if err := CommitToInbox(parent, TransitionNotificationEvent{ | |
| ChildSessionID: c, ChildTitle: c, Profile: "personal", | |
| FromStatus: "running", ToStatus: "waiting", Timestamp: time.Now(), | |
| }); err != nil { | |
| t.Fatalf("commit %s: %v", c, err) | |
| } | |
| } | |
| got, err := DrainInboxForParent(parent) | |
| if err != nil { | |
| t.Fatalf("drain: %v", err) | |
| } | |
| if len(got) != 2 { | |
| t.Fatalf("expected 2 deliverables (one per child), got %d", len(got)) | |
| } | |
| } | |
| // Within a single drain, multiple records for one child collapse to one. | |
| func TestIssue1225_DrainInbox_LastWinsWithinDrain(t *testing.T) { | |
| inboxTestHome(t) | |
| parent := "conductor-c-1777000110" | |
| // Commit multiple records for the same child — only the last should surface. | |
| child := "child-lastwin" | |
| for i, status := range []string{"waiting", "running", "waiting"} { | |
| if err := CommitToInbox(parent, TransitionNotificationEvent{ | |
| ChildSessionID: child, ChildTitle: child, Profile: "personal", | |
| FromStatus: "running", ToStatus: status, | |
| LastOutputHash: fmt.Sprintf("hash-%d", i), | |
| Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond), | |
| }); err != nil { | |
| t.Fatalf("commit %d: %v", i, err) | |
| } | |
| } | |
| got, err := DrainInboxForParent(parent) | |
| if err != nil { | |
| t.Fatalf("drain: %v", err) | |
| } | |
| if len(got) != 1 { | |
| t.Fatalf("expected 1 deliverable (last-wins for single child), got %d", len(got)) | |
| } | |
| if got[0].LastOutputHash != "hash-2" { | |
| t.Fatalf("expected last record (hash-2), got %s", got[0].LastOutputHash) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/issue1225_outbox_consumer_test.go` around lines 70 - 90, The
test TestIssue1225_DrainInbox_LastWinsWithinDrain currently commits one record
per distinct child so it never exercises "last-wins" collapse; change the test
to CommitToInbox multiple TransitionNotificationEvent records for the same
ChildSessionID (e.g., call CommitToInbox twice for "child-1" with different
Timestamp/ToStatus values) and then call DrainInboxForParent and assert that
only the latest record for that child is returned (verify len(got)==1 and the
delivered record matches the last committed event); keep CommitToInbox and
DrainInboxForParent as the interaction points to modify.
| go func() { | ||
| defer wg.Done() | ||
| out, err := DrainInboxForParent(parent) | ||
| if err != nil { | ||
| return | ||
| } | ||
| mu.Lock() | ||
| for _, ev := range out { | ||
| seen[ev.ChildSessionID]++ | ||
| } | ||
| mu.Unlock() | ||
| }() | ||
| } |
There was a problem hiding this comment.
Silently ignoring drain errors may hide race bugs.
When DrainInboxForParent returns an error, the goroutine silently exits without recording the failure. This could mask implementation bugs that only manifest under concurrency. Consider collecting errors and failing the test if any goroutine encounters one.
🧪 Proposed fix to capture and report errors
var wg sync.WaitGroup
var mu sync.Mutex
seen := map[string]int{}
+ var errs []error
for g := 0; g < 6; g++ {
wg.Add(1)
go func() {
defer wg.Done()
out, err := DrainInboxForParent(parent)
if err != nil {
+ mu.Lock()
+ errs = append(errs, err)
+ mu.Unlock()
return
}
mu.Lock()
for _, ev := range out {
seen[ev.ChildSessionID]++
}
mu.Unlock()
}()
}
wg.Wait()
+
+ if len(errs) > 0 {
+ t.Fatalf("concurrent drain errors: %v", errs)
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/issue1225_outbox_consumer_test.go` around lines 114 - 126,
The goroutines calling DrainInboxForParent currently swallow errors which can
hide race bugs; modify the test so each goroutine sends any non-nil error to a
shared error channel (or appends to a protected errors slice) instead of
returning silently, then after wg.Wait() drain the channel and fail the test
(e.g., t.Fatalf or require.NoError) if any error was reported; reference the
goroutine that calls DrainInboxForParent(parent), the wg synchronization, and
the seen map/mu so you place the error reporting alongside the existing
mu.Lock/Unlock logic and verify errors after waiting.
| func inboxTestHome(t *testing.T) { | ||
| t.Helper() | ||
| home := t.TempDir() | ||
| t.Setenv("HOME", home) | ||
| t.Setenv("AGENT_DECK_HOME", "") | ||
| t.Setenv("AGENT_DECK_PROFILE", "") | ||
| ResetInboxFingerprintCacheForTest() | ||
| } |
There was a problem hiding this comment.
Clear config cache in test-home setup to avoid path bleed between tests.
inboxTestHome updates env vars but doesn’t reset user-config cache. If cache is warm from another test, these tests can read/write under the wrong home dir.
💡 Proposed fix
func inboxTestHome(t *testing.T) {
t.Helper()
home := t.TempDir()
t.Setenv("HOME", home)
t.Setenv("AGENT_DECK_HOME", "")
t.Setenv("AGENT_DECK_PROFILE", "")
+ ClearUserConfigCache()
+ t.Cleanup(func() {
+ ClearUserConfigCache()
+ ResetInboxFingerprintCacheForTest()
+ })
ResetInboxFingerprintCacheForTest()
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/issue1225_outbox_producer_test.go` around lines 19 - 26, The
test helper inboxTestHome sets HOME and other env vars but doesn't clear the
user-config cache, so stale config can bleed between tests; update inboxTestHome
to call the user-config cache reset in addition to
ResetInboxFingerprintCacheForTest() (e.g., invoke ResetUserConfigCacheForTest()
or the appropriate ResetUserConfig... function) immediately after setting the
env vars so the config cache is cleared for each test run.
| if again, _ := DrainInboxForParent(parentID); len(again) != 0 { | ||
| t.Fatalf("exactly-once: re-drain must be empty, got %d", len(again)) |
There was a problem hiding this comment.
Assert the second drain error instead of discarding it.
Ignoring the error here can mask a drain failure and still pass on len(again)==0. Assert err == nil before checking emptiness.
Suggested fix
- if again, _ := DrainInboxForParent(parentID); len(again) != 0 {
- t.Fatalf("exactly-once: re-drain must be empty, got %d", len(again))
- }
+ again, err := DrainInboxForParent(parentID)
+ if err != nil {
+ t.Fatalf("re-drain: %v", err)
+ }
+ if len(again) != 0 {
+ t.Fatalf("exactly-once: re-drain must be empty, got %d", len(again))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if again, _ := DrainInboxForParent(parentID); len(again) != 0 { | |
| t.Fatalf("exactly-once: re-drain must be empty, got %d", len(again)) | |
| again, err := DrainInboxForParent(parentID) | |
| if err != nil { | |
| t.Fatalf("re-drain: %v", err) | |
| } | |
| if len(again) != 0 { | |
| t.Fatalf("exactly-once: re-drain must be empty, got %d", len(again)) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/issue1225_unify_durability_test.go` around lines 98 - 99,
The call to DrainInboxForParent currently discards its error return; change the
call to capture both returns (e.g., again, err :=
DrainInboxForParent(parentID)), then assert err == nil (e.g., t.Fatalf("drain
error: %v", err) if err != nil) before asserting len(again) == 0, so any drain
failure is reported instead of hidden; reference DrainInboxForParent and
parentID in the fix.
| totalDropped, err := sweepInboxFilesForChild(dir, entries, childSessionID) | ||
| if err != nil { | ||
| return totalDropped, err | ||
| } | ||
|
|
||
| // Issue #1225: sweep the rest of the child's outbox footprint so a reused id | ||
| // can't inherit stale state and per-parent ledgers don't leak. Best-effort — | ||
| // these never fail the rm. | ||
| _ = os.Remove(DeadLetterPathFor(childSessionID)) // dead-lettered records | ||
| ForgetConsumedTurnsForChild(childSessionID) // consumed-turn ledgers | ||
| ResetStopBlockBudget(childSessionID) // Stop-hook block budget (if it was a parent) | ||
|
|
There was a problem hiding this comment.
Run artifact cleanup even if inbox sweep returns an error.
The early return at Line [53] skips dead-letter/ledger/stop-budget cleanup on partial sweep failure, which leaves stale state behind.
Suggested adjustment
- totalDropped, err := sweepInboxFilesForChild(dir, entries, childSessionID)
- if err != nil {
- return totalDropped, err
- }
+ totalDropped, sweepErr := sweepInboxFilesForChild(dir, entries, childSessionID)
// Issue `#1225`: sweep the rest of the child's outbox footprint so a reused id
// can't inherit stale state and per-parent ledgers don't leak. Best-effort —
// these never fail the rm.
_ = os.Remove(DeadLetterPathFor(childSessionID)) // dead-lettered records
ForgetConsumedTurnsForChild(childSessionID) // consumed-turn ledgers
ResetStopBlockBudget(childSessionID) // Stop-hook block budget (if it was a parent)
- return totalDropped, nil
+ if sweepErr != nil {
+ return totalDropped, sweepErr
+ }
+ return totalDropped, nil🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/session/rm_sweep.go` around lines 52 - 63, The current early return
when sweepInboxFilesForChild(dir, entries, childSessionID) returns an error
prevents the subsequent cleanup calls (DeadLetterPathFor(childSessionID),
ForgetConsumedTurnsForChild(childSessionID),
ResetStopBlockBudget(childSessionID)) from running; change the control flow to
always perform those three cleanup actions regardless of the sweep error: call
sweepInboxFilesForChild and capture totalDropped and err, then unconditionally
run the dead-letter remove and the two Forget/Reset calls, and finally return
totalDropped and the original err (if any) so cleanup runs even on partial sweep
failures.
…outbox (#1225) Resolves the adversarial-audit findings on PR #1226. The headline contract is now TRUE at-least-once-with-dedup (never lost), reconciling the contradictory at-most- once comments. B1 drain is two-phase: stage records to an fsync'd in-flight WAL BEFORE truncating the inbox, finalize the consumed-turn ledger (fsync) AFTER, drop the WAL last. A crash between truncate and ledger-save re-delivers; dedup collapses it. B2 fsync the producer append paths (appendInboxLineLocked, writeDeadLetter, WriteInboxEvent) and check Sync errors. B3 ReadDeadLetter skips corrupt lines instead of aborting on the first. B4 saveStopBlockCountLocked surfaces write errors; DrainForStopHook reserves the block slot durably BEFORE draining (no infinite loop, no consume-and-lose); writeMissedOnce logs every error path. B5 distinct dead-letter reasons (orphan/child_removed/parent_removed/no_notify/ self_conductor); operator-visible terminalDrop; rm sweeps a removed parent's own inbox + in-flight WAL + consumed-turn ledger. B6 scanner cap raised to 16MB + scanner.Err() checked; producer caps DoneSummary to 32KB so an oversized event can't silently truncate or fail the drain. B7 `inbox drain` resolves self via AGENTDECK_INSTANCE_ID -> AGENT_DECK_SESSION_ID -> tmux (robust in worktree/sandbox/cron); bare form + `self` keyword. B8 stop_hook_active is *bool; an absent flag fails safe to active=true so the MaxStopHookBlocks loop guard can't be reset away. B9 orphan child keeps its operator-visible WARN (now a distinct reason). B10 inbox-dir MkdirAll failure surfaces on the producer path (regression test). B11 corrupt-line coverage for ReadAndTruncateInbox + DrainInboxForParent; the misleading TOCTOU/atomicity comment rewritten to the real durability window. B12 Stop-hook drain is runtime-scoped to conductor/parent sessions: a session with an empty inbox fast-returns with no block and zero ledger writes, so the staged global sync flip is inert for leaf sessions. Scoped activation diff noted. B13 daemon-level integration test drives TransitionDaemon.syncProfile end-to-end (status-DB path, no tmux, no claude -p) proving the interactive running->waiting producer commits to the busy parent's durable inbox. Verify: go test -race ./internal/session/... ./cmd/agent-deck/... green; golangci-lint 0 issues; govulncheck 0 vulnerabilities; busy-parent proof 10x green. Committed by Ashesh Goplani
Audit blockers closed — all 18 fix-before-merge findings (commit 6e8876f)Resolved every fix-before-merge finding from the 38-agent audit. Contract decision: TRUE at-least-once-with-dedup (never lost) — reconciles the at-most-once vs at-least-once contradiction the audit flagged. B1 (keystone) — durabilityThe drain is now a two-phase commit with a per-parent in-flight WAL: stage records to Every blocker
Verification
Activation (B12) — separate PRThe global async→sync Stop-hook flip stays in |
| if err != nil { | ||
| return err | ||
| } | ||
| defer f.Close() |
| slog.String("child", event.ChildSessionID), slog.String("error", err.Error())) | ||
| return | ||
| } | ||
| defer f.Close() |
| if err != nil { | ||
| return err | ||
| } | ||
| defer f.Close() |
…tbeat drain) + release v1.9.44 (#1227) Turns ON the durable per-parent outbox comms engine merged in #1226. The engine shipped inert; this is the activation. - claude_hooks.go: flip the Stop hook from Async:true to SYNC so Claude Code reads the {decision:"block",reason} the hook emits and injects busy-parent completions at the conductor's next turn boundary. The flip is conductor-scoped at RUNTIME (hooks install per config-dir, shared by conductor + workers, so there is no install-time seam): DrainForStopHook's InboxHasPending fast-path returns no-block with zero ledger writes for any session with an empty inbox — every leaf / non-conductor session — so the flip is inert for them (audit B12). Loop guard is crash-safe (B4) and fails safe on an absent stop_hook_active flag (B8). - conductor_templates.go + documentation/CONDUCTOR.md: wire `agent-deck inbox drain self` as the first step of every conductor heartbeat — the idle-conductor drain fallback that complements the busy-conductor Stop hook. - Regression test guarding the Stop-sync flip; existing B12 leaf test proves non-conductor sessions never block nor write the stop-block ledger. - Bump Version to 1.9.44; CHANGELOG credits the durable-outbox comms fix (#1225/#1226), the activation, and the credential 401 fix (#1222/#1224). Zero billed inference — the hook is a Go handler, no `claude -p`. Roll out canary-first to one conductor before flipping the fleet (GAP §5). Committed by Ashesh Goplani
PR #1230 audit hardening. sendWakeNudgeNoWait ran exec.Command(...).Run() with no deadline, so a wedged agent-deck binary (e.g. stuck on SQLite/tmux) would leak the detached dispatch goroutine indefinitely. Wrap the subprocess in context.WithTimeout(5s) + exec.CommandContext so a stuck send is reaped. A timed-out send is harmless like any dropped nudge — the durable record still drains on the parent's next turn/heartbeat. The exec is now a package var (wakeNudgeExec) so the deadline + command shape are verified deterministically without spawning a real slow process. Also adds a cheap test that the PRODUCTION defaultWakeNudgeWiring routes its idle probe through the conductor-scoped gate end-to-end (idle conductor nudged; busy conductor and non-conductor leaf not). Refs #1225 #1226. Committed by Ashesh Goplani
… (no 14-min lag) (#1230) * feat(comms): wire wake-nudge for near-instant idle-conductor delivery (no 14-min lag) v1.9.44 (#1226) shipped the durable per-parent outbox + drain (#1225): correct, no loss, exactly-once. But an IDLE conductor only drained on its next heartbeat — up to ~14 min of latency. The WakeNudger (internal/session/inbox_nudge.go) was built and unit-tested but nothing triggered it. Wire it at the single producer commit chokepoint, commitEventToInbox, which BOTH producers funnel through: the interactive running→waiting path (NotifyTransition/ NotifyFinished) and the one-shot run-task kernel-exit path (DeliverCompletion, via runtask_cmd and the daemon replay). The moment a completion durably lands in a parent's inbox, an IDLE conductor is woken to drain it — collapsing the idle worst case from ~14 min to sub-second. The nudge is event-driven (fired on commit, not polled — no new poll loop, no inotify watcher and its kernel-exit race), conductor-scoped, idle-gated (never send-keys into a busy pane — that only queues the keystroke, the exact failure the pull model avoids), debounced per-parent (~500ms; coalesces a burst of simultaneous completions into one wake without delaying the first), and best-effort/fire-and-forget. A dropped or failed nudge is harmless: the same durable record is still drained on the parent's next Stop/heartbeat (wake ≠ deliver). A busy parent is left to drain at its next turn boundary — the physical floor for a busy Claude pane. Zero billed inference. Refs #1225 #1226. Bumps version to 1.9.45 + CHANGELOG. Committed by Ashesh Goplani * fix(comms): bound the detached wake-nudge send with a 5s context timeout PR #1230 audit hardening. sendWakeNudgeNoWait ran exec.Command(...).Run() with no deadline, so a wedged agent-deck binary (e.g. stuck on SQLite/tmux) would leak the detached dispatch goroutine indefinitely. Wrap the subprocess in context.WithTimeout(5s) + exec.CommandContext so a stuck send is reaped. A timed-out send is harmless like any dropped nudge — the durable record still drains on the parent's next turn/heartbeat. The exec is now a package var (wakeNudgeExec) so the deadline + command shape are verified deterministically without spawning a real slow process. Also adds a cheap test that the PRODUCTION defaultWakeNudgeWiring routes its idle probe through the conductor-scoped gate end-to-end (idle conductor nudged; busy conductor and non-conductor leaf not). Refs #1225 #1226. Committed by Ashesh Goplani
Closes #1225.
Problem
A child finishing while its parent conductor is busy (mid-turn) never receives the completion — or gets it late + duplicated. The old model pushes into the parent's tmux pane and only sends when
parent != StatusRunning(transition_notifier.go:451). A conductor is almost always running, so dispatch returnsdeferred_target_busy,scheduleBusyRetryre-checks the same idle-gate at {5s,15s,45s}, the deferred queue ages out, and the event is logged missed. Even Claude only queues keystrokes typed mid-turn (#36326). Proven live 2026-05-29 (childdc3247c3→ parent16c012a4detected+targeted, never delivered).Fix — pull, not push
A durable per-parent outbox (
~/.agent-deck/inboxes/<parent>.jsonl) the parent drains at its own turn boundary (Stop hook →decision:blockinject) and on heartbeat, with a gated wake-nudge for latency. One queue, two producers (interactiverunning→waiting+ one-shotrun-taskkernel-exit), unified. Last-wins per child,turn_fingerprintdedup → exactly-once effects,attempts/TTL + dead-letter to kill thedropped_no_targetrunaway,stop_hook_active+ max-blocks loop guard. Noclaude -p— the primary path keys onrunning→waiting, not process exit.What changed
Built (pull path):
CommitToInbox(last-wins),turn_fingerprint,DrainInboxForParent(atomic, consumed-ledger),DrainForStopHook(+max-blocks guard) wired into the realhook-handlerStop edge,agent-deck inbox drain [--json],DeadLetterSink,WakeNudger, rm_sweep cleanup of all outbox artifacts.Removed (push path):
prepareDispatchbusy-gate,dispatchAsync,scheduleBusyRetry, the deferred queue +DrainRetryQueue, and ack-on-defer (taskworker.go:301). Net −332 lines; producer grep for the old symbols is clean.Surfaces & tests (TDD, RED-first, all 10× race-clean)
cmd/agent-deck/):inbox drain—issue1225_inbox_drain_cli_test.go(happy/json/empty/legacy).internal/session/): producer/consumer/stophook/dead-letter/rm/nudge/durability —issue1225_*_test.go.tests/capability/):TestCapability_Issue1225_BusyParentReceivesCompletionAtTurnBoundarydrives the real compiledhook-handlerend-to-end through the exact 2026-05-29 failure scenario (the headline real-world proof)./tmp/comms-impl/RESULTS.md.Verification:
go test -race ./internal/session/... ./cmd/agent-deck/...green;golangci-lint0 issues;govulncheckno vulnerabilities; capability proof passes.hook-handleremits{decision:"block"}on Stop, but Claude only reads it when the Stop hook runs synchronously. Current install registers Stop asAsync:true(claude_hooks.go). The emit is inert/harmless until a maintainer flips the conductor Stop hook to sync — this is the one install/config change left out.agent-deck inbox drain <self>the first heartbeat step (CLAUDE.md / conductor-template — docs).WakeNudgeris the tested policy core; the linuxinotify(IN_CLOSE_WRITE)watcher that calls it is daemon glue, flagged for follow-up.Commits signed
Committed by Ashesh Goplani.Summary by CodeRabbit
New Features
agent-deck inbox drainwith human and--jsonoutput modesRefactor
Bug Fixes / Safety