Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts ..
startupEvents := make(chan runtime.Event, 10)
go func() {
defer close(startupEvents)
rt.EmitStartupInfo(ctx, startupEvents)
rt.EmitStartupInfo(ctx, sess, startupEvents)
}()
for event := range startupEvents {
select {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (a *App) ResolveCommand(ctx context.Context, userInput string) string {

// EmitStartupInfo emits initial agent, team, and toolset information to the provided channel
func (a *App) EmitStartupInfo(ctx context.Context, events chan runtime.Event) {
a.runtime.EmitStartupInfo(ctx, events)
a.runtime.EmitStartupInfo(ctx, a.session, events)
}

// Run one agent loop
Expand Down Expand Up @@ -622,7 +622,7 @@ func (a *App) SetCurrentAgentModel(ctx context.Context, modelRef string) error {
startupEvents := make(chan runtime.Event, 10)
go func() {
defer close(startupEvents)
a.runtime.EmitStartupInfo(ctx, startupEvents)
a.runtime.EmitStartupInfo(ctx, a.session, startupEvents)
}()
for event := range startupEvents {
select {
Expand Down Expand Up @@ -796,7 +796,7 @@ func (a *App) ReplaceSession(ctx context.Context, sess *session.Session) {
startupEvents := make(chan runtime.Event, 10)
go func() {
defer close(startupEvents)
a.runtime.EmitStartupInfo(ctx, startupEvents)
a.runtime.EmitStartupInfo(ctx, a.session, startupEvents)
}()
for event := range startupEvents {
select {
Expand Down
6 changes: 4 additions & 2 deletions pkg/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ func (m *mockRuntime) SetCurrentAgent(name string) error { return nil }
func (m *mockRuntime) CurrentAgentTools(ctx context.Context) ([]tools.Tool, error) {
return nil, nil
}
func (m *mockRuntime) EmitStartupInfo(ctx context.Context, events chan runtime.Event) {}
func (m *mockRuntime) ResetStartupInfo() {}

func (m *mockRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events chan runtime.Event) {
}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event {
ch := make(chan runtime.Event)
close(ch)
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/export/html.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func sessionToData(sess *session.Session) SessionData {
CreatedAt: sess.CreatedAt,
InputTokens: sess.InputTokens,
OutputTokens: sess.OutputTokens,
Cost: sess.Cost,
Cost: sess.TotalCost(),
Messages: exportMessages,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (m *mockRuntime) CurrentAgentInfo(context.Context) CurrentAgentInfo {
func (m *mockRuntime) SetCurrentAgent(string) error {
return nil
}
func (m *mockRuntime) EmitStartupInfo(context.Context, chan Event) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, chan Event) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) RunStream(context.Context, *session.Session) <-chan Event {
return nil
}
Expand Down
32 changes: 17 additions & 15 deletions pkg/runtime/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,26 +258,28 @@ type MessageUsage struct {
Model string
}

func TokenUsage(sessionID, agentName string, inputTokens, outputTokens, contextLength, contextLimit int64, cost float64) Event {
return TokenUsageWithMessage(sessionID, agentName, inputTokens, outputTokens, contextLength, contextLimit, cost, nil)
}

func TokenUsageWithMessage(sessionID, agentName string, inputTokens, outputTokens, contextLength, contextLimit int64, cost float64, msgUsage *MessageUsage) Event {
// NewTokenUsageEvent creates a TokenUsageEvent with the given usage data.
func NewTokenUsageEvent(sessionID, agentName string, usage *Usage) Event {
return &TokenUsageEvent{
Type: "token_usage",
SessionID: sessionID,
Usage: &Usage{
ContextLength: contextLength,
ContextLimit: contextLimit,
InputTokens: inputTokens,
OutputTokens: outputTokens,
Cost: cost,
LastMessage: msgUsage,
},
Type: "token_usage",
SessionID: sessionID,
Usage: usage,
AgentContext: newAgentContext(agentName),
}
}

// SessionUsage builds a Usage from the session's current token counts, the
// model's context limit, and the session's own cost.
func SessionUsage(sess *session.Session, contextLimit int64) *Usage {
return &Usage{
InputTokens: sess.InputTokens,
OutputTokens: sess.OutputTokens,
ContextLength: sess.InputTokens + sess.OutputTokens,
ContextLimit: contextLimit,
Cost: sess.OwnCost(),
}
}

type SessionTitleEvent struct {
Type string `json:"type"`
SessionID string `json:"session_id"`
Expand Down
5 changes: 4 additions & 1 deletion pkg/runtime/persistent_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func (r *PersistentRuntime) handleEvent(ctx context.Context, sess *session.Sessi
}

case *TokenUsageEvent:
if e.Usage != nil {
// Only persist token usage for the current session.
// During task transfers, sub-session events flow through but should
// not overwrite the parent session's token counts.
if e.Usage != nil && e.SessionID == sess.ID {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium: RAG token events are filtered out and not persisted

The condition e.SessionID == sess.ID filters out RAG token usage events because RAG events use an empty SessionID (see runtime.go line ~388: NewTokenUsageEvent("", agentName, ...)).

When RAG operations occur during a session:

  • RAG events have SessionID = ""
  • The current session has sess.ID = "some-uuid"
  • The comparison "" == "some-uuid" is FALSE
  • Result: RAG token usage is never persisted via UpdateSessionTokens()

This means RAG costs are displayed in real-time but not saved to the database, causing session cost reports to undercount actual usage.

Recommendation: Either:

  1. Pass the actual session ID when creating RAG events, or
  2. Add special handling for empty SessionID:
if e.Usage != nil && (e.SessionID == sess.ID || e.SessionID == "") {

or
3. Accumulate RAG costs directly in the session instead of relying on event-based persistence

if err := r.sessionStore.UpdateSessionTokens(ctx, sess.ID, e.Usage.InputTokens, e.Usage.OutputTokens, e.Usage.Cost); err != nil {
slog.Warn("Failed to persist token usage", "session_id", sess.ID, "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/remote_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *RemoteRuntime) CurrentAgentTools(_ context.Context) ([]tools.Tool, erro
}

// EmitStartupInfo emits initial agent, team, and toolset information
func (r *RemoteRuntime) EmitStartupInfo(ctx context.Context, events chan Event) {
func (r *RemoteRuntime) EmitStartupInfo(ctx context.Context, _ *session.Session, events chan Event) {
cfg := r.readCurrentAgentConfig(ctx)

events <- AgentInfo(r.currentAgent, cfg.Model, cfg.Description, cfg.WelcomeMessage)
Expand Down
78 changes: 50 additions & 28 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ type Runtime interface {
SetCurrentAgent(agentName string) error
// CurrentAgentTools returns the tools for the active agent
CurrentAgentTools(ctx context.Context) ([]tools.Tool, error)
// EmitStartupInfo emits initial agent, team, and toolset information for immediate display
EmitStartupInfo(ctx context.Context, events chan Event)
// EmitStartupInfo emits initial agent, team, and toolset information for immediate display.
// When sess is non-nil and contains token data, a TokenUsageEvent is also emitted
// so the UI can display context usage percentage on session restore.
EmitStartupInfo(ctx context.Context, sess *session.Session, events chan Event)
// ResetStartupInfo resets the startup info emission flag, allowing re-emission
ResetStartupInfo()
// RunStream starts the agent's interaction loop and returns a channel of events
Expand Down Expand Up @@ -379,15 +381,11 @@ func (r *LocalRuntime) forwardRAGEvents(ctx context.Context, ragManagers map[str
sendEvent(RAGIndexingCompleted(ragName, ragEvent.StrategyName, agentName))
case ragtypes.EventTypeUsage:
// Convert RAG usage to TokenUsageEvent so TUI displays it
sendEvent(TokenUsage(
"",
agentName,
ragEvent.TotalTokens, // input tokens (embeddings)
0, // output tokens (0 for embeddings)
ragEvent.TotalTokens, // context length
0, // context limit (not applicable)
ragEvent.Cost,
))
sendEvent(NewTokenUsageEvent("", agentName, &Usage{
InputTokens: ragEvent.TotalTokens,
ContextLength: ragEvent.TotalTokens,
Cost: ragEvent.Cost,
}))
case ragtypes.EventTypeError:
if ragEvent.Error != nil {
sendEvent(Error(fmt.Sprintf("RAG %s error: %v", ragName, ragEvent.Error)))
Expand Down Expand Up @@ -714,8 +712,10 @@ func (r *LocalRuntime) ResetStartupInfo() {
r.startupInfoEmitted = false
}

// EmitStartupInfo emits initial agent, team, and toolset information for immediate sidebar display
func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, events chan Event) {
// EmitStartupInfo emits initial agent, team, and toolset information for immediate sidebar display.
// When sess is non-nil and contains token data, a TokenUsageEvent is also emitted so that the
// sidebar can display context usage percentage on session restore.
func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events chan Event) {
// Prevent duplicate emissions
if r.startupInfoEmitted {
return
Expand All @@ -736,13 +736,32 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, events chan Event) {

// Emit agent and team information immediately for fast sidebar display
// Use getEffectiveModelID to account for active fallback cooldowns
if !send(AgentInfo(a.Name(), r.getEffectiveModelID(a), a.Description(), a.WelcomeMessage())) {
modelID := r.getEffectiveModelID(a)
if !send(AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage())) {
return
}
if !send(TeamInfo(r.agentDetailsFromTeam(), r.currentAgent)) {
return
}

// When restoring a session that already has token data, emit a
// TokenUsageEvent so the sidebar can show the context usage percentage.
// The context limit comes from the model definition (models.dev), which
// is a model property — not persisted in the session.
//
// Use TotalCost (not OwnCost) because this is a restore/branch context:
// sub-sessions won't emit their own events, so the parent must include
// their costs.
if sess != nil && (sess.InputTokens > 0 || sess.OutputTokens > 0) {
var contextLimit int64
if m, err := r.modelsStore.GetModel(ctx, modelID); err == nil && m != nil {
contextLimit = int64(m.Limit.Context)
}
usage := SessionUsage(sess, contextLimit)
usage.Cost = sess.TotalCost()
send(NewTokenUsageEvent(sess.ID, r.currentAgent, usage))
}

// Emit agent warnings (if any) - these are quick
r.emitAgentWarningsWithSend(a, send)

Expand Down Expand Up @@ -1001,9 +1020,9 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
}

if m != nil && r.sessionCompaction {
if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) {
contextLength := sess.InputTokens + sess.OutputTokens
if contextLength > int64(float64(contextLimit)*0.9) {
r.Summarize(ctx, sess, "", events)
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)
}
}

Expand Down Expand Up @@ -1107,7 +1126,9 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
slog.Debug("Skipping empty assistant message (no content and no tool calls)", "agent", a.Name())
}

events <- TokenUsageWithMessage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost, msgUsage)
usage := SessionUsage(sess, contextLimit)
usage.LastMessage = msgUsage
events <- NewTokenUsageEvent(sess.ID, r.currentAgent, usage)

r.processToolCalls(ctx, sess, res.Calls, agentTools, events)

Expand Down Expand Up @@ -1273,7 +1294,6 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
var actualModelEventEmitted bool
var messageUsage *chat.Usage
var messageRateLimit *chat.RateLimit
var prevStreamCost float64 // cost contributed by previous usage emission in this stream

modelID := getAgentModelID(a)
toolCallIndex := make(map[string]int) // toolCallID -> index in toolCalls slice
Expand All @@ -1295,23 +1315,14 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
if response.Usage != nil {
messageUsage = response.Usage

if m != nil && m.Cost != nil {
streamCost := (float64(response.Usage.InputTokens)*m.Cost.Input +
float64(response.Usage.OutputTokens)*m.Cost.Output +
float64(response.Usage.CachedInputTokens)*m.Cost.CacheRead +
float64(response.Usage.CacheWriteTokens)*m.Cost.CacheWrite) / 1e6
sess.Cost += streamCost - prevStreamCost
prevStreamCost = streamCost
}

sess.InputTokens = response.Usage.InputTokens + response.Usage.CachedInputTokens + response.Usage.CacheWriteTokens
sess.OutputTokens = response.Usage.OutputTokens

modelName := "unknown"
if m != nil {
modelName = m.Name
}
telemetry.RecordTokenUsage(ctx, modelName, sess.InputTokens, sess.OutputTokens, sess.Cost)
telemetry.RecordTokenUsage(ctx, modelName, sess.InputTokens, sess.OutputTokens, sess.TotalCost())
}

if response.RateLimit != nil {
Expand Down Expand Up @@ -1954,6 +1965,17 @@ func (r *LocalRuntime) handleHandoff(_ context.Context, _ *session.Session, tool
// for the summarization (e.g., "focus on code changes" or "include action items").
func (r *LocalRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events chan Event) {
r.sessionCompactor.Compact(ctx, sess, additionalPrompt, events, r.currentAgent)

// Emit a TokenUsageEvent so the sidebar immediately reflects the
// compaction: tokens drop to the summary size, context % drops, and
// cost increases by the summary generation cost.
a := r.CurrentAgent()
modelID := r.getEffectiveModelID(a)
var contextLimit int64
if m, err := r.modelsStore.GetModel(ctx, modelID); err == nil && m != nil {
contextLimit = int64(m.Limit.Context)
}
events <- NewTokenUsageEvent(sess.ID, r.currentAgent, SessionUsage(sess, contextLimit))
}

// setElicitationEventsChannel sets the current events channel for elicitation requests
Expand Down
Loading