Skip to content

[Gastown] PR 5: Rig DO Alarm — Work Scheduler #212

@jrf0110

Description

@jrf0110

Parent: #204 | Phase 1: Single Rig, Single Polecat

Revised: This was previously "tRPC Routes — Town & Rig Management." The tRPC routes have been moved to a new issue. This issue is now about the Rig DO alarm — the core scheduler that drives the system.

Goal

The Rig DO becomes the active scheduler. Alarms periodically scan state and signal the container to start/stop agent processes. This is the "DO is the brain, container is the muscle" model.

Alarm Handler

async alarm(): Promise<void> {
  await this.schedulePendingWork();
  await this.witnessPatrol();
  await this.processReviewQueue();

  // Re-arm: 30s while active, 5 min when idle
  const hasActiveWork = this.hasActiveAgentsOrPendingBeads();
  const nextAlarm = hasActiveWork ? 30_000 : 300_000;
  this.ctx.storage.setAlarm(Date.now() + nextAlarm);
}

schedulePendingWork() — Dispatch beads to agents

Find agents that have hooked beads but are idle (not yet started in container), and signal the container to start them:

async schedulePendingWork(): Promise<void> {
  const pendingAgents = this.ctx.storage.sql.exec(
    `SELECT a.*, b.id as bead_id, b.title as bead_title
     FROM agents a
     JOIN beads b ON b.assignee_agent_id = a.id
     WHERE a.status = 'idle'
     AND b.status = 'in_progress'
     AND a.current_hook_bead_id IS NOT NULL`
  ).toArray();

  for (const agent of pendingAgents) {
    await this.startAgentInContainer(agent);
  }
}

witnessPatrol() — Health monitoring

The existing witnessPatrol() method is now called by the alarm. Updated to check container process health instead of cloud-agent-next session health:

async witnessPatrol(): Promise<void> {
  const workingAgents = /* SELECT agents WHERE status IN ('working', 'blocked') */;

  for (const agent of workingAgents) {
    // Check if agent process is alive in container
    const container = this.env.TOWN_CONTAINER.get(
      this.env.TOWN_CONTAINER.idFromName(this.townId)
    );
    const statusRes = await container.fetch(`http://container/agents/${agent.id}/status`);
    const { status } = await statusRes.json();

    if (status === 'not_found' || status === 'exited') {
      if (agent.current_hook_bead_id) {
        await this.restartAgent(agent); // Re-dispatch with checkpoint
      } else {
        this.updateAgentStatus(agent.id, 'idle');
      }
      continue;
    }

    // GUPP violation check (30 min no progress)
    if (agent.last_activity_at) {
      const staleMs = Date.now() - new Date(agent.last_activity_at).getTime();
      if (staleMs > 30 * 60 * 1000) {
        await this.sendMail({
          from_agent_id: 'witness',
          to_agent_id: agent.id,
          subject: 'GUPP_CHECK',
          body: 'You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.',
        });
      }
    }
  }
}

processReviewQueue() — Trigger merge/refinery

async processReviewQueue(): Promise<void> {
  const pendingEntry = this.popReviewQueue();
  if (!pendingEntry) return;
  // Phase 1: deterministic git merge via container
  // Phase 2: start refinery agent in container
  await this.startMergeInContainer(pendingEntry);
}

Alarm Activation

The alarm is armed when:

  • A new bead is assigned to an agent (hookBead)
  • An agent calls agentDone (to process review queue)
  • Container reports an agent process has exited
  • Health check endpoint is called
private armAlarmIfNeeded() {
  const currentAlarm = this.ctx.storage.getAlarm();
  if (!currentAlarm) {
    this.ctx.storage.setAlarm(Date.now() + 5_000);
  }
}

Note: This issue subsumes the witness alarm functionality from the old #217. The witness patrol is now part of the Rig DO alarm handler rather than a separate PR.

Dependencies

  • PR 1 (Rig DO)
  • PR 4 (Town Container — for fetch() communication)

Acceptance Criteria

  • alarm() handler implemented in Rig DO
  • schedulePendingWork() dispatches idle agents to container
  • witnessPatrol() checks container process health, restarts dead agents, detects GUPP violations
  • processReviewQueue() triggers merge processing
  • Alarm auto-arms on bead assignment, agent done, and health check
  • Adaptive alarm interval (30s active, 5 min idle)
  • Dead agent detection and restart with checkpoint recovery
  • GUPP violation mail sent to stale agents
  • Integration test: bead created → alarm fires → container starts agent

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions