Skip to content

Commit 074e56c

Browse files
authored
Merge branch 'main' into 35C4n0r/handle-partial-tool-call
2 parents 751310f + c171c14 commit 074e56c

File tree

7 files changed

+633
-436
lines changed

7 files changed

+633
-436
lines changed

cmd/server/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
136136
fmt.Println(srv.GetOpenAPI())
137137
return nil
138138
}
139-
srv.StartSnapshotLoop(ctx)
140139
logger.Info("Starting server on port", "port", port)
141140
processExitCh := make(chan error, 1)
142141
go func() {

e2e/echo.go

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,10 @@ func runEchoAgent(scriptPath string) {
8989
if entry.ThinkDurationMS > 0 {
9090
redrawTerminal(messages, true)
9191
spinnerCtx, spinnerCancel := context.WithCancel(ctx)
92-
go runSpinner(spinnerCtx)
92+
spinnerDone := runSpinner(spinnerCtx)
9393
time.Sleep(time.Duration(entry.ThinkDurationMS) * time.Millisecond)
94-
if spinnerCancel != nil {
95-
spinnerCancel()
96-
}
94+
spinnerCancel()
95+
<-spinnerDone
9796
}
9897

9998
messages = append(messages, st.ConversationMessage{
@@ -133,9 +132,10 @@ func runEchoAgent(scriptPath string) {
133132
if entry.ThinkDurationMS > 0 {
134133
redrawTerminal(messages, true)
135134
spinnerCtx, spinnerCancel := context.WithCancel(ctx)
136-
go runSpinner(spinnerCtx)
135+
spinnerDone := runSpinner(spinnerCtx)
137136
time.Sleep(time.Duration(entry.ThinkDurationMS) * time.Millisecond)
138137
spinnerCancel()
138+
<-spinnerDone
139139
}
140140

141141
messages = append(messages, st.ConversationMessage{
@@ -190,21 +190,26 @@ func cleanTerminalInput(input string) string {
190190
return strings.TrimSpace(input)
191191
}
192192

193-
func runSpinner(ctx context.Context) {
194-
spinnerChars := []string{"|", "/", "-", "\\"}
195-
ticker := time.NewTicker(200 * time.Millisecond)
196-
defer ticker.Stop()
197-
i := 0
198-
199-
for {
200-
select {
201-
case <-ticker.C:
202-
fmt.Printf("\rThinking %s", spinnerChars[i%len(spinnerChars)])
203-
i++
204-
case <-ctx.Done():
205-
// Clear spinner on cancellation
206-
fmt.Print("\r" + strings.Repeat(" ", 20) + "\r")
207-
return
193+
func runSpinner(ctx context.Context) <-chan struct{} {
194+
done := make(chan struct{})
195+
go func() {
196+
defer close(done)
197+
spinnerChars := []string{"|", "/", "-", "\\"}
198+
ticker := time.NewTicker(200 * time.Millisecond)
199+
defer ticker.Stop()
200+
i := 0
201+
202+
for {
203+
select {
204+
case <-ticker.C:
205+
fmt.Printf("\rThinking %s", spinnerChars[i%len(spinnerChars)])
206+
i++
207+
case <-ctx.Done():
208+
// Clear spinner on cancellation
209+
fmt.Print("\r" + strings.Repeat(" ", 20) + "\r")
210+
return
211+
}
208212
}
209-
}
213+
}()
214+
return done
210215
}

e2e/echo_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,11 @@ func setup(ctx context.Context, t testing.TB, p *params) ([]ScriptEntry, *agenta
133133
cwd, err := os.Getwd()
134134
require.NoError(t, err, "Failed to get current working directory")
135135
binaryPath = filepath.Join(cwd, "..", "out", "agentapi")
136-
_, err = os.Stat(binaryPath)
137-
if err != nil {
138-
t.Logf("Building binary at %s", binaryPath)
139-
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
140-
buildCmd.Dir = filepath.Join(cwd, "..")
141-
t.Logf("run: %s", buildCmd.String())
142-
require.NoError(t, buildCmd.Run(), "Failed to build binary")
143-
}
136+
t.Logf("Building binary at %s", binaryPath)
137+
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
138+
buildCmd.Dir = filepath.Join(cwd, "..")
139+
t.Logf("run: %s", buildCmd.String())
140+
require.NoError(t, buildCmd.Run(), "Failed to build binary")
144141
}
145142

146143
serverPort, err := getFreePort()
@@ -254,7 +251,11 @@ func waitAgentAPIStable(ctx context.Context, t testing.TB, apiClient *agentapisd
254251
return nil
255252
}
256253
} else {
257-
t.Logf("Got %T event", evt)
254+
var sb strings.Builder
255+
if err := json.NewEncoder(&sb).Encode(evt); err != nil {
256+
t.Logf("Failed to encode event: %v", err)
257+
}
258+
t.Logf("Got event: %s", sb.String())
258259
}
259260
case err := <-errs:
260261
return fmt.Errorf("read events: %w", err)

lib/httpapi/server.go

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Server struct {
4141
srv *http.Server
4242
mu sync.RWMutex
4343
logger *slog.Logger
44-
conversation *st.PTYConversation
44+
conversation st.Conversation
4545
agentio *termexec.Process
4646
agentType mf.AgentType
4747
emitter *EventEmitter
@@ -244,6 +244,14 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
244244
return mf.FormatToolCall(config.AgentType, message)
245245
}
246246

247+
emitter := NewEventEmitter(1024)
248+
249+
// Format initial prompt into message parts if provided
250+
var initialPrompt []st.MessagePart
251+
if config.InitialPrompt != "" {
252+
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
253+
}
254+
247255
conversation := st.NewPTY(ctx, st.PTYConversationConfig{
248256
AgentType: config.AgentType,
249257
AgentIO: config.Process,
@@ -253,9 +261,17 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
253261
FormatMessage: formatMessage,
254262
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
255263
FormatToolCall: formatToolCall,
256-
Logger: logger,
257-
}, config.InitialPrompt)
258-
emitter := NewEventEmitter(1024)
264+
InitialPrompt: initialPrompt,
265+
// OnSnapshot uses a callback rather than passing the emitter directly
266+
// to keep the screentracker package decoupled from httpapi concerns.
267+
// This preserves clean package boundaries and avoids import cycles.
268+
OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) {
269+
emitter.UpdateStatusAndEmitChanges(status, config.AgentType)
270+
emitter.UpdateMessagesAndEmitChanges(messages)
271+
emitter.UpdateScreenAndEmitChanges(screen)
272+
},
273+
Logger: logger,
274+
})
259275

260276
// Create temporary directory for uploads
261277
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
@@ -281,6 +297,16 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
281297
// Register API routes
282298
s.registerRoutes()
283299

300+
// Start the conversation polling loop if we have a process.
301+
// Process is nil only when --print-openapi is used (no agent runs).
302+
// The process is already running at this point - termexec.StartProcess()
303+
// blocks until the PTY is created and the process is active. Agent
304+
// readiness (waiting for the prompt) is handled asynchronously inside
305+
// conversation.Start() via ReadyForInitialPrompt.
306+
if config.Process != nil {
307+
s.conversation.Start(ctx)
308+
}
309+
284310
return s, nil
285311
}
286312

@@ -336,38 +362,6 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) {
336362
next(ctx)
337363
}
338364

339-
func (s *Server) StartSnapshotLoop(ctx context.Context) {
340-
s.conversation.Start(ctx)
341-
go func() {
342-
ticker := s.clock.NewTicker(snapshotInterval)
343-
defer ticker.Stop()
344-
for {
345-
currentStatus := s.conversation.Status()
346-
347-
// Send initial prompt when agent becomes stable for the first time
348-
if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable {
349-
if err := s.conversation.Send(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil {
350-
s.logger.Error("Failed to send initial prompt", "error", err)
351-
} else {
352-
s.conversation.InitialPromptSent = true
353-
s.conversation.ReadyForInitialPrompt = false
354-
currentStatus = st.ConversationStatusChanging
355-
s.logger.Info("Initial prompt sent successfully")
356-
}
357-
}
358-
s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType)
359-
s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages())
360-
s.emitter.UpdateScreenAndEmitChanges(s.conversation.Text())
361-
362-
select {
363-
case <-ctx.Done():
364-
return
365-
case <-ticker.C:
366-
}
367-
}
368-
}()
369-
}
370-
371365
// registerRoutes sets up all API endpoints
372366
func (s *Server) registerRoutes() {
373367
// GET /status endpoint

0 commit comments

Comments
 (0)