Skip to content

Commit eb2e202

Browse files
committed
feat: add ACPConversation
1 parent 0ff03dc commit eb2e202

File tree

8 files changed

+240
-59
lines changed

8 files changed

+240
-59
lines changed

chat/src/components/chat-provider.tsx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,9 @@ export function ChatProvider({ children }: PropsWithChildren) {
303303
description: message,
304304
});
305305
} finally {
306+
// Remove optimistic draft message if still present (may have been replaced by server response via SSE).
307+
setMessages((prev) => prev.filter((m) => !isDraftMessage(m)));
306308
if (type === "user") {
307-
setMessages((prevMessages) =>
308-
prevMessages.filter((m) => !isDraftMessage(m))
309-
);
310309
setLoading(false);
311310
}
312311
}

cmd/attach/attach.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,42 @@ func WriteRawInputOverHTTP(ctx context.Context, url string, msg string) error {
129129
return nil
130130
}
131131

132+
// statusResponse is used to parse the /status endpoint response.
133+
type statusResponse struct {
134+
Status string `json:"status"`
135+
AgentType string `json:"agent_type"`
136+
Backend string `json:"backend"`
137+
}
138+
139+
func checkACPMode(remoteUrl string) error {
140+
resp, err := http.Get(remoteUrl + "/status")
141+
if err != nil {
142+
return xerrors.Errorf("failed to check server status: %w", err)
143+
}
144+
defer func() { _ = resp.Body.Close() }()
145+
146+
if resp.StatusCode != http.StatusOK {
147+
return xerrors.Errorf("unexpected %d response from server: %s", resp.StatusCode, resp.Status)
148+
}
149+
150+
var status statusResponse
151+
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
152+
return xerrors.Errorf("failed to decode server status: %w", err)
153+
}
154+
155+
if status.Backend == "acp" {
156+
return xerrors.New("attach is not supported in ACP mode. The server is running with --experimental-acp which uses JSON-RPC instead of terminal emulation.")
157+
}
158+
159+
return nil
160+
}
161+
132162
func runAttach(remoteUrl string) error {
163+
// Check if server is running in ACP mode (attach not supported)
164+
if err := checkACPMode(remoteUrl); err != nil {
165+
return err
166+
}
167+
133168
ctx, cancel := context.WithCancel(context.Background())
134169
defer cancel()
135170
stdin := int(os.Stdin.Fd())

cmd/server/server.go

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/coder/agentapi/lib/httpapi"
2020
"github.com/coder/agentapi/lib/logctx"
2121
"github.com/coder/agentapi/lib/msgfmt"
22+
st "github.com/coder/agentapi/lib/screentracker"
2223
"github.com/coder/agentapi/lib/termexec"
2324
)
2425

@@ -104,11 +105,33 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
104105
}
105106

106107
printOpenAPI := viper.GetBool(FlagPrintOpenAPI)
108+
experimentalACP := viper.GetBool(FlagExperimentalACP)
109+
110+
if printOpenAPI && experimentalACP {
111+
return xerrors.Errorf("flags --%s and --%s are mutually exclusive", FlagPrintOpenAPI, FlagExperimentalACP)
112+
}
113+
114+
var agentIO st.AgentIO
115+
var transport = "pty"
107116
var process *termexec.Process
117+
var acpResult *httpapi.SetupACPResult
118+
108119
if printOpenAPI {
109-
process = nil
120+
agentIO = nil
121+
} else if experimentalACP {
122+
var err error
123+
acpResult, err = httpapi.SetupACP(ctx, httpapi.SetupACPConfig{
124+
Program: agent,
125+
ProgramArgs: argsToPass[1:],
126+
})
127+
if err != nil {
128+
return xerrors.Errorf("failed to setup ACP: %w", err)
129+
}
130+
acpIO := acpResult.AgentIO
131+
agentIO = acpIO
132+
transport = "acp"
110133
} else {
111-
process, err = httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
134+
proc, err := httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
112135
Program: agent,
113136
ProgramArgs: argsToPass[1:],
114137
TerminalWidth: termWidth,
@@ -118,11 +141,14 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
118141
if err != nil {
119142
return xerrors.Errorf("failed to setup process: %w", err)
120143
}
144+
process = proc
145+
agentIO = proc
121146
}
122147
port := viper.GetInt(FlagPort)
123148
srv, err := httpapi.NewServer(ctx, httpapi.ServerConfig{
124149
AgentType: agentType,
125-
Process: process,
150+
AgentIO: agentIO,
151+
Transport: transport,
126152
Port: port,
127153
ChatBasePath: viper.GetString(FlagChatBasePath),
128154
AllowedHosts: viper.GetStringSlice(FlagAllowedHosts),
@@ -138,19 +164,35 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
138164
}
139165
logger.Info("Starting server on port", "port", port)
140166
processExitCh := make(chan error, 1)
141-
go func() {
142-
defer close(processExitCh)
143-
if err := process.Wait(); err != nil {
144-
if errors.Is(err, termexec.ErrNonZeroExitCode) {
145-
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
146-
} else {
147-
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
167+
// Wait for process exit in PTY mode
168+
if process != nil {
169+
go func() {
170+
defer close(processExitCh)
171+
if err := process.Wait(); err != nil {
172+
if errors.Is(err, termexec.ErrNonZeroExitCode) {
173+
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
174+
} else {
175+
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
176+
}
148177
}
149-
}
150-
if err := srv.Stop(ctx); err != nil {
151-
logger.Error("Failed to stop server", "error", err)
152-
}
153-
}()
178+
if err := srv.Stop(ctx); err != nil {
179+
logger.Error("Failed to stop server", "error", err)
180+
}
181+
}()
182+
}
183+
// Wait for process exit in ACP mode
184+
if acpResult != nil {
185+
go func() {
186+
defer close(processExitCh)
187+
defer close(acpResult.Done) // Signal cleanup goroutine to exit
188+
if err := acpResult.Wait(); err != nil {
189+
processExitCh <- xerrors.Errorf("ACP process exited: %w", err)
190+
}
191+
if err := srv.Stop(ctx); err != nil {
192+
logger.Error("Failed to stop server", "error", err)
193+
}
194+
}()
195+
}
154196
if err := srv.Start(); err != nil && err != context.Canceled && err != http.ErrServerClosed {
155197
return xerrors.Errorf("failed to start server: %w", err)
156198
}
@@ -180,16 +222,17 @@ type flagSpec struct {
180222
}
181223

182224
const (
183-
FlagType = "type"
184-
FlagPort = "port"
185-
FlagPrintOpenAPI = "print-openapi"
186-
FlagChatBasePath = "chat-base-path"
187-
FlagTermWidth = "term-width"
188-
FlagTermHeight = "term-height"
189-
FlagAllowedHosts = "allowed-hosts"
190-
FlagAllowedOrigins = "allowed-origins"
191-
FlagExit = "exit"
192-
FlagInitialPrompt = "initial-prompt"
225+
FlagType = "type"
226+
FlagPort = "port"
227+
FlagPrintOpenAPI = "print-openapi"
228+
FlagChatBasePath = "chat-base-path"
229+
FlagTermWidth = "term-width"
230+
FlagTermHeight = "term-height"
231+
FlagAllowedHosts = "allowed-hosts"
232+
FlagAllowedOrigins = "allowed-origins"
233+
FlagExit = "exit"
234+
FlagInitialPrompt = "initial-prompt"
235+
FlagExperimentalACP = "experimental-acp"
193236
)
194237

195238
func CreateServerCmd() *cobra.Command {
@@ -228,6 +271,7 @@ func CreateServerCmd() *cobra.Command {
228271
// localhost:3284 is the default origin when you open the chat interface in your browser. localhost:3000 and 3001 are used during development.
229272
{FlagAllowedOrigins, "o", []string{"http://localhost:3284", "http://localhost:3000", "http://localhost:3001"}, "HTTP allowed origins. Use '*' for all, comma-separated list via flag, space-separated list via AGENTAPI_ALLOWED_ORIGINS env var", "stringSlice"},
230273
{FlagInitialPrompt, "I", "", "Initial prompt for the agent. Recommended only if the agent doesn't support initial prompt in interaction mode. Will be read from stdin if piped (e.g., echo 'prompt' | agentapi server -- my-agent)", "string"},
274+
{FlagExperimentalACP, "", false, "Use experimental ACP transport instead of PTY", "bool"},
231275
}
232276

233277
for _, spec := range flagSpecs {

lib/httpapi/models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type StatusResponse struct {
3838
Body struct {
3939
Status AgentStatus `json:"status" doc:"Current agent status. 'running' means that the agent is processing a message, 'stable' means that the agent is idle and waiting for input."`
4040
AgentType mf.AgentType `json:"agent_type" doc:"Type of the agent being used by the server."`
41+
Backend string `json:"backend" doc:"Backend transport being used ('acp' or 'pty')."`
4142
}
4243
}
4344

lib/httpapi/server.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
mf "github.com/coder/agentapi/lib/msgfmt"
2525
st "github.com/coder/agentapi/lib/screentracker"
2626
"github.com/coder/agentapi/lib/termexec"
27+
"github.com/coder/agentapi/x/acpio"
2728
"github.com/coder/quartz"
2829
"github.com/danielgtaylor/huma/v2"
2930
"github.com/danielgtaylor/huma/v2/adapters/humachi"
@@ -42,12 +43,13 @@ type Server struct {
4243
mu sync.RWMutex
4344
logger *slog.Logger
4445
conversation st.Conversation
45-
agentio *termexec.Process
46+
agentio st.AgentIO
4647
agentType mf.AgentType
4748
emitter *EventEmitter
4849
chatBasePath string
4950
tempDir string
5051
clock quartz.Clock
52+
transport string
5153
}
5254

5355
func (s *Server) NormalizeSchema(schema any) any {
@@ -98,7 +100,8 @@ const snapshotInterval = 25 * time.Millisecond
98100

99101
type ServerConfig struct {
100102
AgentType mf.AgentType
101-
Process *termexec.Process
103+
AgentIO st.AgentIO
104+
Transport string
102105
Port int
103106
ChatBasePath string
104107
AllowedHosts []string
@@ -252,18 +255,34 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
252255
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
253256
}
254257

255-
conversation := st.NewPTY(ctx, st.PTYConversationConfig{
256-
AgentType: config.AgentType,
257-
AgentIO: config.Process,
258-
Clock: config.Clock,
259-
SnapshotInterval: snapshotInterval,
260-
ScreenStabilityLength: 2 * time.Second,
261-
FormatMessage: formatMessage,
262-
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
263-
FormatToolCall: formatToolCall,
264-
InitialPrompt: initialPrompt,
265-
Logger: logger,
266-
}, emitter)
258+
// Create appropriate conversation based on transport type
259+
var conversation st.Conversation
260+
if config.Transport == "acp" {
261+
// For ACP, cast AgentIO to *acpio.ACPAgentIO
262+
acpIO, ok := config.AgentIO.(*acpio.ACPAgentIO)
263+
if !ok {
264+
return nil, fmt.Errorf("ACP transport requires ACPAgentIO")
265+
}
266+
conversation = acpio.NewACPConversation(ctx, acpIO, logger, initialPrompt, emitter, config.Clock)
267+
} else {
268+
// Default to PTY transport
269+
proc, ok := config.AgentIO.(*termexec.Process)
270+
if !ok && config.AgentIO != nil {
271+
return nil, fmt.Errorf("PTY transport requires termexec.Process")
272+
}
273+
conversation = st.NewPTY(ctx, st.PTYConversationConfig{
274+
AgentType: config.AgentType,
275+
AgentIO: proc,
276+
Clock: config.Clock,
277+
SnapshotInterval: snapshotInterval,
278+
ScreenStabilityLength: 2 * time.Second,
279+
FormatMessage: formatMessage,
280+
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
281+
FormatToolCall: formatToolCall,
282+
InitialPrompt: initialPrompt,
283+
Logger: logger,
284+
}, emitter)
285+
}
267286

268287
// Create temporary directory for uploads
269288
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
@@ -278,24 +297,25 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
278297
port: config.Port,
279298
conversation: conversation,
280299
logger: logger,
281-
agentio: config.Process,
300+
agentio: config.AgentIO,
282301
agentType: config.AgentType,
283302
emitter: emitter,
284303
chatBasePath: strings.TrimSuffix(config.ChatBasePath, "/"),
285304
tempDir: tempDir,
286305
clock: config.Clock,
306+
transport: config.Transport,
287307
}
288308

289309
// Register API routes
290310
s.registerRoutes()
291311

292-
// Start the conversation polling loop if we have a process.
293-
// Process is nil only when --print-openapi is used (no agent runs).
294-
// The process is already running at this point - termexec.StartProcess()
295-
// blocks until the PTY is created and the process is active. Agent
296-
// readiness (waiting for the prompt) is handled asynchronously inside
297-
// conversation.Start() via ReadyForInitialPrompt.
298-
if config.Process != nil {
312+
// Start the conversation polling loop if we have an agent IO.
313+
// AgentIO is nil only when --print-openapi is used (no agent runs).
314+
// For PTY transport, the process is already running at this point -
315+
// termexec.StartProcess() blocks until the PTY is created and the process
316+
// is active. Agent readiness (waiting for the prompt) is handled
317+
// asynchronously inside conversation.Start() via ReadyForInitialPrompt.
318+
if config.AgentIO != nil {
299319
s.conversation.Start(ctx)
300320
}
301321

@@ -417,6 +437,7 @@ func (s *Server) getStatus(ctx context.Context, input *struct{}) (*StatusRespons
417437
resp := &StatusResponse{}
418438
resp.Body.Status = agentStatus
419439
resp.Body.AgentType = s.agentType
440+
resp.Body.Backend = s.transport
420441

421442
return resp, nil
422443
}

0 commit comments

Comments
 (0)