Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/core/src/state_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use codex_protocol::ThreadId;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_state::DB_METRIC_BACKFILL;
pub use codex_state::LogEntry;
use codex_state::STATE_DB_FILENAME;
use codex_state::ThreadMetadataBuilder;
use serde_json::Value;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ serde_json = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
Expand Down
14 changes: 14 additions & 0 deletions codex-rs/state/migrations/0002_logs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
ts_nanos INTEGER NOT NULL,
level TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT,
fields_json TEXT NOT NULL,
module_path TEXT,
file TEXT,
line INTEGER
);

CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
2 changes: 2 additions & 0 deletions codex-rs/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
//! orchestration and rollout scanning live in `codex-core`.

mod extract;
pub mod log_db;
mod migrations;
mod model;
mod paths;
mod runtime;

pub use model::LogEntry;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;

Expand Down
162 changes: 162 additions & 0 deletions codex-rs/state/src/log_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//! Tracing log export into the state SQLite database.
//!
//! This module provides a `tracing_subscriber::Layer` that captures events and
//! inserts them into the `logs` table in `state.sqlite`. The writer runs in a
//! background task and batches inserts to keep logging overhead low.
//!
//! ## Usage
//!
//! ```no_run
//! use codex_state::log_db;
//! use tracing_subscriber::prelude::*;
//!
//! # async fn example(state_db: std::sync::Arc<codex_state::StateRuntime>) {
//! let layer = log_db::start(state_db);
//! let _ = tracing_subscriber::registry()
//! .with(layer)
//! .try_init();
//! # }
//! ```

use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use serde_json::Value;
use tokio::sync::mpsc;
use tracing::Event;
use tracing::field::Field;
use tracing::field::Visit;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;

use crate::LogEntry;
use crate::StateRuntime;

const LOG_QUEUE_CAPACITY: usize = 512;
const LOG_BATCH_SIZE: usize = 64;
const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250);

pub struct LogDbLayer {
sender: mpsc::Sender<LogEntry>,
}

pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(state_db, receiver));

LogDbLayer { sender }
}

impl<S> Layer<S> for LogDbLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let metadata = event.metadata();
let mut visitor = JsonVisitor::default();
event.record(&mut visitor);

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0));
let entry = LogEntry {
ts: now.as_secs() as i64,
ts_nanos: now.subsec_nanos() as i64,
level: metadata.level().as_str().to_string(),
target: metadata.target().to_string(),
message: visitor.message,
fields_json: Value::Object(visitor.fields).to_string(),
module_path: metadata.module_path().map(ToString::to_string),
file: metadata.file().map(ToString::to_string),
line: metadata.line().map(|line| line as i64),
};

let _ = self.sender.try_send(entry);
}
}

async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogEntry>,
) {
let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE);
let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL);
loop {
tokio::select! {
maybe_entry = receiver.recv() => {
match maybe_entry {
Some(entry) => {
buffer.push(entry);
if buffer.len() >= LOG_BATCH_SIZE {
flush(&state_db, &mut buffer).await;
}
}
None => {
flush(&state_db, &mut buffer).await;
break;
}
}
}
_ = ticker.tick() => {
flush(&state_db, &mut buffer).await;
}
}
}
}

async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntry>) {
if buffer.is_empty() {
return;
}
let entries = buffer.split_off(0);
let _ = state_db.insert_logs(entries.as_slice()).await;
}

#[derive(Default)]
struct JsonVisitor {
fields: serde_json::Map<String, Value>,
message: Option<String>,
}

impl JsonVisitor {
fn record_value(&mut self, field: &Field, value: Value) {
if field.name() == "message" && self.message.is_none() {
self.message = Some(match &value {
Value::String(message) => message.clone(),
_ => value.to_string(),
});
}
self.fields.insert(field.name().to_string(), value);
}
}

impl Visit for JsonVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_value(field, Value::from(value));
}

fn record_u64(&mut self, field: &Field, value: u64) {
self.record_value(field, Value::from(value));
}

fn record_bool(&mut self, field: &Field, value: bool) {
self.record_value(field, Value::from(value));
}

fn record_f64(&mut self, field: &Field, value: f64) {
self.record_value(field, Value::from(value));
}

fn record_str(&mut self, field: &Field, value: &str) {
self.record_value(field, Value::from(value));
}

fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_value(field, Value::from(value.to_string()));
}

fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_value(field, Value::from(format!("{value:?}")));
}
}
14 changes: 14 additions & 0 deletions codex-rs/state/src/model/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde::Serialize;

#[derive(Clone, Debug, Serialize)]
pub struct LogEntry {
pub ts: i64,
pub ts_nanos: i64,
pub level: String,
pub target: String,
pub message: Option<String>,
pub fields_json: String,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<i64>,
}
15 changes: 15 additions & 0 deletions codex-rs/state/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
mod log;
mod thread_metadata;

pub use log::LogEntry;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;
pub use thread_metadata::SortKey;
pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;
pub use thread_metadata::ThreadsPage;

pub(crate) use thread_metadata::ThreadRow;
pub(crate) use thread_metadata::anchor_from_item;
pub(crate) use thread_metadata::datetime_to_epoch_seconds;
30 changes: 30 additions & 0 deletions codex-rs/state/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::DB_ERROR_METRIC;
use crate::LogEntry;
use crate::SortKey;
use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
Expand Down Expand Up @@ -201,6 +202,35 @@ FROM threads
})
}

/// Insert one log entry into the logs table.
pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> {
self.insert_logs(std::slice::from_ref(entry)).await
}

/// Insert a batch of log entries into the logs table.
pub async fn insert_logs(&self, entries: &[LogEntry]) -> anyhow::Result<()> {
if entries.is_empty() {
return Ok(());
}

let mut builder = QueryBuilder::<Sqlite>::new(
"INSERT INTO logs (ts, ts_nanos, level, target, message, fields_json, module_path, file, line) ",
);
builder.push_values(entries, |mut row, entry| {
row.push_bind(entry.ts)
.push_bind(entry.ts_nanos)
.push_bind(&entry.level)
.push_bind(&entry.target)
.push_bind(&entry.message)
.push_bind(&entry.fields_json)
.push_bind(&entry.module_path)
.push_bind(&entry.file)
.push_bind(entry.line);
});
builder.build().execute(self.pool.as_ref()).await?;
Ok(())
}

/// List thread ids using the underlying database (no rollout scanning).
pub async fn list_thread_ids(
&self,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/tui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-state = { workspace = true }
codex-utils-absolute-path = { workspace = true }
color-eyre = { workspace = true }
crossterm = { workspace = true, features = ["bracketed-paste", "event-stream"] }
Expand Down
6 changes: 6 additions & 0 deletions codex-rs/tui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_state::log_db;
use codex_utils_absolute_path::AbsolutePathBuf;
use cwd_prompt::CwdPromptAction;
use cwd_prompt::CwdSelection;
Expand Down Expand Up @@ -351,10 +352,15 @@ pub async fn run_main(

let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());

let log_db_layer = codex_core::state_db::init_if_enabled(&config, None)
.await
.map(|db| log_db::start(db).with_filter(env_filter()));

let _ = tracing_subscriber::registry()
.with(file_layer)
.with(feedback_layer)
.with(feedback_metadata_layer)
.with(log_db_layer)
.with(otel_logger_layer)
.with(otel_tracing_layer)
.try_init();
Expand Down
Loading