Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 98 additions & 11 deletions pkg/deck/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
roleUser = "user"
groupIDPrefix = "group:"
groupWindow = time.Hour
sessionCacheTTL = 10 * time.Second
sessionCacheTTL = 30 * time.Second
messageGroupWindow = 5 * time.Second
maxGroupedTextChars = 4000
)
Expand Down Expand Up @@ -91,6 +91,7 @@ type sessionGroup struct {
type sessionCache struct {
mu sync.RWMutex
candidates []sessionCandidate
byID map[string]*sessionCandidate
loadedAt time.Time
}

Expand Down Expand Up @@ -185,13 +186,57 @@ func (q *Query) cachedSessionCandidates() []sessionCandidate {
return copySessionCandidates(q.cache.candidates)
}

// cachedSessionCandidate returns a single candidate by session ID from the
// cache index, or nil if the cache is stale/empty or the ID is not found.
func (q *Query) cachedSessionCandidate(sessionID string) *sessionCandidate {
q.cache.mu.RLock()
defer q.cache.mu.RUnlock()

if len(q.cache.byID) == 0 {
return nil
}
if time.Since(q.cache.loadedAt) > sessionCacheTTL {
return nil
}

c, ok := q.cache.byID[sessionID]
if !ok {
return nil
}

cp := *c
return &cp
}

func (q *Query) storeSessionCandidates(candidates []sessionCandidate) {
q.cache.mu.Lock()
defer q.cache.mu.Unlock()
q.cache.candidates = copySessionCandidates(candidates)
q.cache.byID = buildCandidateIndex(q.cache.candidates)
q.cache.loadedAt = time.Now()
}

// candidateByID performs a linear scan for a session ID in a slice.
// Used on the slow path after a fresh load before the index is populated.
func candidateByID(candidates []sessionCandidate, sessionID string) (sessionCandidate, bool) {
for _, c := range candidates {
if c.summary.ID == sessionID {
return c, true
}
}
return sessionCandidate{}, false
}

// buildCandidateIndex returns a map keyed by session ID pointing into the
// given slice. The pointers are valid for the lifetime of the slice.
func buildCandidateIndex(candidates []sessionCandidate) map[string]*sessionCandidate {
idx := make(map[string]*sessionCandidate, len(candidates))
for i := range candidates {
idx[candidates[i].summary.ID] = &candidates[i]
}
return idx
}

func copySessionCandidates(candidates []sessionCandidate) []sessionCandidate {
if len(candidates) == 0 {
return nil
Expand Down Expand Up @@ -411,7 +456,7 @@ func firstNonEmptyModel(members []sessionCandidate) string {
}

func (q *Query) Overview(ctx context.Context, filters Filters) (*Overview, error) {
candidates, err := q.loadSessionCandidates(ctx, false)
candidates, err := q.loadSessionCandidates(ctx, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -473,6 +518,35 @@ func (q *Query) SessionDetail(ctx context.Context, sessionID string) (*SessionDe
return q.groupSessionDetail(ctx, sessionID)
}

// Fast path: O(1) lookup in the cache index.
if c := q.cachedSessionCandidate(sessionID); c != nil {
messages, toolFrequency := q.buildSessionMessages(c.nodes)
grouped := buildGroupedMessages(messages)
return &SessionDetail{
Summary: c.summary,
Messages: messages,
GroupedMessages: grouped,
ToolFrequency: toolFrequency,
}, nil
}

// Slow path: reload candidates (cache miss or stale) and try again.
candidates, err := q.loadSessionCandidates(ctx, false)
if err != nil {
return nil, err
}
if c, ok := candidateByID(candidates, sessionID); ok {
messages, toolFrequency := q.buildSessionMessages(c.nodes)
grouped := buildGroupedMessages(messages)
return &SessionDetail{
Summary: c.summary,
Messages: messages,
GroupedMessages: grouped,
ToolFrequency: toolFrequency,
}, nil
}

// Fallback: session is brand-new and not yet in candidates.
leaf, err := q.client.Node.Get(ctx, sessionID)
if err != nil {
return nil, fmt.Errorf("get session: %w", err)
Expand All @@ -490,14 +564,12 @@ func (q *Query) SessionDetail(ctx context.Context, sessionID string) (*SessionDe

messages, toolFrequency := q.buildSessionMessages(nodes)
grouped := buildGroupedMessages(messages)
detail := &SessionDetail{
return &SessionDetail{
Summary: summary,
Messages: messages,
GroupedMessages: grouped,
ToolFrequency: toolFrequency,
}

return detail, nil
}, nil
}

func (q *Query) groupSessionDetail(ctx context.Context, sessionID string) (*SessionDetail, error) {
Expand Down Expand Up @@ -1242,7 +1314,7 @@ func SortSessions(sessions []SessionSummary, sortKey, sortDir string) {
}

func (q *Query) AnalyticsOverview(ctx context.Context, filters Filters) (*AnalyticsOverview, error) {
candidates, err := q.loadSessionCandidates(ctx, false)
candidates, err := q.loadSessionCandidates(ctx, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1287,15 +1359,15 @@ func (q *Query) AnalyticsOverview(ctx context.Context, filters Filters) (*Analyt
for _, member := range group.members {
for _, n := range member.nodes {
blocks, _ := parseContentBlocks(n.Content)
for _, tool := range extractToolCalls(blocks) {
tools := extractToolCalls(blocks)
hasErr := blocksHaveToolError(blocks)
for _, tool := range tools {
if _, ok := toolGlobal[tool]; !ok {
toolGlobal[tool] = &ToolMetric{Name: tool}
}
toolGlobal[tool].Count++
sessionTools[tool] = true
}
if blocksHaveToolError(blocks) {
for _, tool := range extractToolCalls(blocks) {
if hasErr {
toolErrors[tool]++
}
}
Expand Down Expand Up @@ -1400,6 +1472,21 @@ func (q *Query) SessionAnalytics(ctx context.Context, sessionID string) (*Sessio
return q.groupSessionAnalytics(ctx, sessionID)
}

// Fast path: O(1) lookup in the cache index.
if c := q.cachedSessionCandidate(sessionID); c != nil {
return buildSessionAnalytics(sessionID, c.nodes), nil
}

// Slow path: reload candidates (cache miss or stale) and try again.
candidates, err := q.loadSessionCandidates(ctx, false)
if err != nil {
return nil, err
}
if c, ok := candidateByID(candidates, sessionID); ok {
return buildSessionAnalytics(sessionID, c.nodes), nil
}

// Fallback: session is brand-new and not yet in candidates.
leaf, err := q.client.Node.Get(ctx, sessionID)
if err != nil {
return nil, fmt.Errorf("get session: %w", err)
Expand Down
Loading