Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codex-rs/core/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@
"minimum": 0.0,
"type": "integer"
},
"max_unused_days": {
"description": "Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.",
"format": "int64",
"type": "integer"
},
"min_rollout_idle_hours": {
"description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.",
"format": "int64",
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,7 @@ persistence = "none"
let memories = r#"
[memories]
max_raw_memories_for_global = 512
max_unused_days = 21
max_rollout_age_days = 42
max_rollouts_per_startup = 9
min_rollout_idle_hours = 24
Expand All @@ -2478,6 +2479,7 @@ phase_2_model = "gpt-5"
assert_eq!(
Some(MemoriesToml {
max_raw_memories_for_global: Some(512),
max_unused_days: Some(21),
max_rollout_age_days: Some(42),
max_rollouts_per_startup: Some(9),
min_rollout_idle_hours: Some(24),
Expand All @@ -2497,6 +2499,7 @@ phase_2_model = "gpt-5"
config.memories,
MemoriesConfig {
max_raw_memories_for_global: 512,
max_unused_days: 21,
max_rollout_age_days: 42,
max_rollouts_per_startup: 9,
min_rollout_idle_hours: 24,
Expand Down
9 changes: 9 additions & 0 deletions codex-rs/core/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;

#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -363,6 +364,8 @@ pub struct FeedbackConfigToml {
pub struct MemoriesToml {
/// Maximum number of recent raw memories retained for global consolidation.
pub max_raw_memories_for_global: Option<usize>,
/// Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.
pub max_unused_days: Option<i64>,
/// Maximum age of the threads used for memories.
pub max_rollout_age_days: Option<i64>,
/// Maximum number of rollout candidates processed per pass.
Expand All @@ -379,6 +382,7 @@ pub struct MemoriesToml {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemoriesConfig {
pub max_raw_memories_for_global: usize,
pub max_unused_days: i64,
pub max_rollout_age_days: i64,
pub max_rollouts_per_startup: usize,
pub min_rollout_idle_hours: i64,
Expand All @@ -390,6 +394,7 @@ impl Default for MemoriesConfig {
fn default() -> Self {
Self {
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
max_unused_days: DEFAULT_MEMORIES_MAX_UNUSED_DAYS,
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
Expand All @@ -407,6 +412,10 @@ impl From<MemoriesToml> for MemoriesConfig {
.max_raw_memories_for_global
.unwrap_or(defaults.max_raw_memories_for_global)
.min(4096),
max_unused_days: toml
.max_unused_days
.unwrap_or(defaults.max_unused_days)
.clamp(0, 365),
max_rollout_age_days: toml
.max_rollout_age_days
.unwrap_or(defaults.max_rollout_age_days)
Expand Down
9 changes: 8 additions & 1 deletion codex-rs/core/src/memories/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ Phase 2 consolidates the latest stage-1 outputs into the filesystem memory artif
What it does:

- claims a single global phase-2 job (so only one consolidation runs at a time)
- loads a bounded set of the most recent stage-1 outputs from the state DB (the per-rollout memories produced by Phase 1, used as the consolidation input set)
- loads a bounded set of stage-1 outputs from the state DB using phase-2
selection rules:
- ignores memories whose `last_usage` falls outside the configured
`max_unused_days` window
- for memories with no `last_usage`, falls back to `generated_at` so fresh
never-used memories can still be selected
- ranks eligible memories by `usage_count` first, then by the most recent
`last_usage` / `generated_at`
- computes a completion watermark from the claimed watermark + newest input timestamps
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
Expand Down
6 changes: 5 additions & 1 deletion codex-rs/core/src/memories/phase2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
};
let root = memory_root(&config.codex_home);
let max_raw_memories = config.memories.max_raw_memories_for_global;
let max_unused_days = config.memories.max_unused_days;

// 1. Claim the job.
let claim = match job::claim(session, db).await {
Expand All @@ -76,7 +77,10 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
};

// 3. Query the memories
let selection = match db.get_phase2_input_selection(max_raw_memories).await {
let selection = match db
.get_phase2_input_selection(max_raw_memories, max_unused_days)
.await
{
Ok(selection) => selection,
Err(err) => {
tracing::error!("failed to list stage1 outputs from global: {}", err);
Expand Down
14 changes: 10 additions & 4 deletions codex-rs/core/src/memories/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ mod phase2 {
#[tokio::test]
async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
harness.seed_stage1_output(Utc::now().timestamp()).await;

let stale_claim = harness
.state_db
Expand All @@ -573,12 +573,18 @@ mod phase2 {

phase2::run(&harness.session, Arc::clone(&harness.config)).await;

let running_claim = harness
let post_dispatch_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim while running");
pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
.expect("claim after stale lock dispatch");
assert!(
matches!(
post_dispatch_claim,
Phase2JobClaimOutcome::SkippedRunning | Phase2JobClaimOutcome::SkippedNotDirty
),
"stale-lock dispatch should either keep the reclaimed job running or finish it before re-claim"
);

let user_input_ops = harness.user_input_ops_count();
pretty_assertions::assert_eq!(user_input_ops, 1);
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/suite/memories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ async fn wait_for_phase2_success(
) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db.get_phase2_input_selection(1).await?;
let selection = db.get_phase2_input_selection(1, 30).await?;
if selection.selected.len() == 1
&& selection.selected[0].thread_id == expected_thread_id
&& selection.retained_thread_ids == vec![expected_thread_id]
Expand Down
Loading
Loading