Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use tracing::debug;
use tracing::error;
use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace_span;
use tracing::warn;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl Codex {
let session_source_clone = session_configuration.session_source.clone();
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);

let session_init_span = info_span!("session_init");
let session = Session::new(
session_configuration,
config.clone(),
Expand All @@ -366,6 +368,7 @@ impl Codex {
skills_manager,
agent_control,
)
.instrument(session_init_span)
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
Expand All @@ -374,7 +377,10 @@ impl Codex {
let thread_id = session.conversation_id;

// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(Arc::clone(&session), config, rx_sub));
let session_loop_span = info_span!("session_loop", thread_id = %thread_id);
tokio::spawn(
submission_loop(Arc::clone(&session), config, rx_sub).instrument(session_loop_span),
);
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
Expand Down
28 changes: 17 additions & 11 deletions codex-rs/core/src/shell_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use tokio::fs;
use tokio::process::Command;
use tokio::sync::watch;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::info_span;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ShellSnapshot {
Expand All @@ -42,17 +44,21 @@ impl ShellSnapshot {

let snapshot_shell = shell.clone();
let snapshot_session_id = session_id;
tokio::spawn(async move {
let timer = otel_manager.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot =
ShellSnapshot::try_new(&codex_home, snapshot_session_id, &snapshot_shell)
.await
.map(Arc::new);
let success = if snapshot.is_some() { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success)]));
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
let _ = shell_snapshot_tx.send(snapshot);
});
let snapshot_span = info_span!("shell_snapshot", thread_id = %snapshot_session_id);
tokio::spawn(
async move {
let timer = otel_manager.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot =
ShellSnapshot::try_new(&codex_home, snapshot_session_id, &snapshot_shell)
.await
.map(Arc::new);
let success = if snapshot.is_some() { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success)]));
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
let _ = shell_snapshot_tx.send(snapshot);
}
.instrument(snapshot_span),
);
}

async fn try_new(codex_home: &Path, session_id: ThreadId, shell: &Shell) -> Option<Self> {
Expand Down
41 changes: 24 additions & 17 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument;
use tracing::info_span;
use tracing::trace;
use tracing::warn;

Expand Down Expand Up @@ -130,25 +132,30 @@ impl Session {
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
tokio::spawn(async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();
sess.on_task_finished(ctx_for_finish, last_agent_message)
let thread_id = self.conversation_id;
let session_span = info_span!("session_task", thread_id = %thread_id);
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();
sess.on_task_finished(ctx_for_finish, last_agent_message)
.await;
}
done_clone.notify_waiters();
}
done_clone.notify_waiters();
})
.instrument(session_span),
)
};

let timer = turn_context
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/state/migrations/0002_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ CREATE TABLE logs (
level TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT,
thread_id TEXT,
module_path TEXT,
file TEXT,
line INTEGER
);

CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
CREATE INDEX idx_logs_thread_id ON logs(thread_id);
20 changes: 18 additions & 2 deletions codex-rs/state/src/bin/logs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ struct Args {
#[arg(long)]
file: Option<String>,

/// Match a specific thread id.
#[arg(long)]
thread_id: Option<String>,

/// Number of matching rows to show before tailing.
#[arg(long, default_value_t = 200)]
backfill: usize,
Expand All @@ -65,6 +69,7 @@ struct LogRow {
ts_nanos: i64,
level: String,
message: Option<String>,
thread_id: Option<String>,
file: Option<String>,
line: Option<i64>,
}
Expand All @@ -76,6 +81,7 @@ struct LogFilter {
to_ts: Option<i64>,
module_like: Option<String>,
file_like: Option<String>,
thread_id: Option<String>,
}

#[tokio::main]
Expand Down Expand Up @@ -139,6 +145,7 @@ fn build_filter(args: &Args) -> anyhow::Result<LogFilter> {
to_ts,
module_like: args.module.clone(),
file_like: args.file.clone(),
thread_id: args.thread_id.clone(),
})
}

Expand Down Expand Up @@ -236,7 +243,7 @@ async fn fetch_max_id(pool: &SqlitePool, filter: &LogFilter) -> anyhow::Result<i

fn base_select_builder<'a>() -> QueryBuilder<'a, Sqlite> {
QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, message, file, line FROM logs WHERE 1 = 1",
"SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1",
)
}

Expand Down Expand Up @@ -264,6 +271,11 @@ fn push_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, filter: &'a LogFilte
.push_bind(file_like.as_str())
.push(" || '%'");
}
if let Some(thread_id) = filter.thread_id.as_ref() {
builder
.push(" AND thread_id = ")
.push_bind(thread_id.as_str());
}
}

fn format_row(row: &LogRow) -> String {
Expand All @@ -277,9 +289,13 @@ fn format_row(row: &LogRow) -> String {
let message = row.message.as_deref().unwrap_or("");
let level_colored = color_level(level);
let timestamp_colored = timestamp.dimmed().to_string();
let thread_id = row.thread_id.as_deref().unwrap_or("-");
let thread_id_colored = thread_id.yellow().to_string();
let location_colored = location.dimmed().to_string();
let message_colored = message.bold().to_string();
format!("{timestamp_colored} {level_colored} {location_colored} - {message_colored}")
format!(
"{timestamp_colored} {level_colored} [{thread_id_colored}] {location_colored} - {message_colored}"
)
}

fn color_level(level: &str) -> String {
Expand Down
Loading
Loading