Skip to content

Commit 5d6b259

Browse files
authored
feat: add ACPConversation (#189)
Relates to coder/internal#1333 Adds ACPConversation type to agentapi server
1 parent a967615 commit 5d6b259

File tree

8 files changed

+257
-63
lines changed

8 files changed

+257
-63
lines changed

chat/src/components/chat-provider.tsx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,9 @@ export function ChatProvider({ children }: PropsWithChildren) {
303303
description: message,
304304
});
305305
} finally {
306+
// Remove optimistic draft message if still present (may have been replaced by server response via SSE).
307+
setMessages((prev) => prev.filter((m) => !isDraftMessage(m)));
306308
if (type === "user") {
307-
setMessages((prevMessages) =>
308-
prevMessages.filter((m) => !isDraftMessage(m))
309-
);
310309
setLoading(false);
311310
}
312311
}

cmd/attach/attach.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,33 @@ func WriteRawInputOverHTTP(ctx context.Context, url string, msg string) error {
129129
return nil
130130
}
131131

132-
func runAttach(remoteUrl string) error {
132+
func checkACPMode(remoteURL string) (bool, error) {
133+
resp, err := http.Get(remoteURL + "/status")
134+
if err != nil {
135+
return false, xerrors.Errorf("failed to check server status: %w", err)
136+
}
137+
defer func() { _ = resp.Body.Close() }()
138+
139+
if resp.StatusCode != http.StatusOK {
140+
return false, xerrors.Errorf("unexpected %d response from server: %s", resp.StatusCode, resp.Status)
141+
}
142+
143+
var status httpapi.StatusResponse
144+
if err := json.NewDecoder(resp.Body).Decode(&status.Body); err != nil {
145+
return false, xerrors.Errorf("failed to decode server status: %w", err)
146+
}
147+
148+
return status.Body.Transport == httpapi.TransportACP, nil
149+
}
150+
151+
func runAttach(remoteURL string) error {
152+
// Check if server is running in ACP mode (attach not supported)
153+
if isACP, err := checkACPMode(remoteURL); err != nil {
154+
_, _ = fmt.Fprintf(os.Stderr, "WARN: Unable to check server: %s", err.Error())
155+
} else if isACP {
156+
return xerrors.New("attach is not yet supported in ACP mode")
157+
}
158+
133159
ctx, cancel := context.WithCancel(context.Background())
134160
defer cancel()
135161
stdin := int(os.Stdin.Fd())
@@ -152,7 +178,7 @@ func runAttach(remoteUrl string) error {
152178
readScreenErrCh := make(chan error, 1)
153179
go func() {
154180
defer close(readScreenErrCh)
155-
if err := ReadScreenOverHTTP(ctx, remoteUrl+"/internal/screen", screenCh); err != nil {
181+
if err := ReadScreenOverHTTP(ctx, remoteURL+"/internal/screen", screenCh); err != nil {
156182
if errors.Is(err, context.Canceled) {
157183
return
158184
}
@@ -175,7 +201,7 @@ func runAttach(remoteUrl string) error {
175201
if input == "\x03" {
176202
continue
177203
}
178-
if err := WriteRawInputOverHTTP(ctx, remoteUrl+"/message", input); err != nil {
204+
if err := WriteRawInputOverHTTP(ctx, remoteURL+"/message", input); err != nil {
179205
writeRawInputErrCh <- xerrors.Errorf("failed to write raw input: %w", err)
180206
return
181207
}

cmd/server/server.go

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/coder/agentapi/lib/httpapi"
2020
"github.com/coder/agentapi/lib/logctx"
2121
"github.com/coder/agentapi/lib/msgfmt"
22+
st "github.com/coder/agentapi/lib/screentracker"
2223
"github.com/coder/agentapi/lib/termexec"
2324
)
2425

@@ -104,11 +105,33 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
104105
}
105106

106107
printOpenAPI := viper.GetBool(FlagPrintOpenAPI)
108+
experimentalACP := viper.GetBool(FlagExperimentalACP)
109+
110+
if printOpenAPI && experimentalACP {
111+
return xerrors.Errorf("flags --%s and --%s are mutually exclusive", FlagPrintOpenAPI, FlagExperimentalACP)
112+
}
113+
114+
var agentIO st.AgentIO
115+
transport := "pty"
107116
var process *termexec.Process
117+
var acpResult *httpapi.SetupACPResult
118+
108119
if printOpenAPI {
109-
process = nil
120+
agentIO = nil
121+
} else if experimentalACP {
122+
var err error
123+
acpResult, err = httpapi.SetupACP(ctx, httpapi.SetupACPConfig{
124+
Program: agent,
125+
ProgramArgs: argsToPass[1:],
126+
})
127+
if err != nil {
128+
return xerrors.Errorf("failed to setup ACP: %w", err)
129+
}
130+
acpIO := acpResult.AgentIO
131+
agentIO = acpIO
132+
transport = "acp"
110133
} else {
111-
process, err = httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
134+
proc, err := httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
112135
Program: agent,
113136
ProgramArgs: argsToPass[1:],
114137
TerminalWidth: termWidth,
@@ -118,11 +141,14 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
118141
if err != nil {
119142
return xerrors.Errorf("failed to setup process: %w", err)
120143
}
144+
process = proc
145+
agentIO = proc
121146
}
122147
port := viper.GetInt(FlagPort)
123148
srv, err := httpapi.NewServer(ctx, httpapi.ServerConfig{
124149
AgentType: agentType,
125-
Process: process,
150+
AgentIO: agentIO,
151+
Transport: httpapi.Transport(transport),
126152
Port: port,
127153
ChatBasePath: viper.GetString(FlagChatBasePath),
128154
AllowedHosts: viper.GetStringSlice(FlagAllowedHosts),
@@ -138,19 +164,35 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
138164
}
139165
logger.Info("Starting server on port", "port", port)
140166
processExitCh := make(chan error, 1)
141-
go func() {
142-
defer close(processExitCh)
143-
if err := process.Wait(); err != nil {
144-
if errors.Is(err, termexec.ErrNonZeroExitCode) {
145-
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
146-
} else {
147-
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
167+
// Wait for process exit in PTY mode
168+
if process != nil {
169+
go func() {
170+
defer close(processExitCh)
171+
if err := process.Wait(); err != nil {
172+
if errors.Is(err, termexec.ErrNonZeroExitCode) {
173+
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
174+
} else {
175+
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
176+
}
148177
}
149-
}
150-
if err := srv.Stop(ctx); err != nil {
151-
logger.Error("Failed to stop server", "error", err)
152-
}
153-
}()
178+
if err := srv.Stop(ctx); err != nil {
179+
logger.Error("Failed to stop server", "error", err)
180+
}
181+
}()
182+
}
183+
// Wait for process exit in ACP mode
184+
if acpResult != nil {
185+
go func() {
186+
defer close(processExitCh)
187+
defer close(acpResult.Done) // Signal cleanup goroutine to exit
188+
if err := acpResult.Wait(); err != nil {
189+
processExitCh <- xerrors.Errorf("ACP process exited: %w", err)
190+
}
191+
if err := srv.Stop(ctx); err != nil {
192+
logger.Error("Failed to stop server", "error", err)
193+
}
194+
}()
195+
}
154196
if err := srv.Start(); err != nil && err != context.Canceled && err != http.ErrServerClosed {
155197
return xerrors.Errorf("failed to start server: %w", err)
156198
}
@@ -180,16 +222,17 @@ type flagSpec struct {
180222
}
181223

182224
const (
183-
FlagType = "type"
184-
FlagPort = "port"
185-
FlagPrintOpenAPI = "print-openapi"
186-
FlagChatBasePath = "chat-base-path"
187-
FlagTermWidth = "term-width"
188-
FlagTermHeight = "term-height"
189-
FlagAllowedHosts = "allowed-hosts"
190-
FlagAllowedOrigins = "allowed-origins"
191-
FlagExit = "exit"
192-
FlagInitialPrompt = "initial-prompt"
225+
FlagType = "type"
226+
FlagPort = "port"
227+
FlagPrintOpenAPI = "print-openapi"
228+
FlagChatBasePath = "chat-base-path"
229+
FlagTermWidth = "term-width"
230+
FlagTermHeight = "term-height"
231+
FlagAllowedHosts = "allowed-hosts"
232+
FlagAllowedOrigins = "allowed-origins"
233+
FlagExit = "exit"
234+
FlagInitialPrompt = "initial-prompt"
235+
FlagExperimentalACP = "experimental-acp"
193236
)
194237

195238
func CreateServerCmd() *cobra.Command {
@@ -228,6 +271,7 @@ func CreateServerCmd() *cobra.Command {
228271
// localhost:3284 is the default origin when you open the chat interface in your browser. localhost:3000 and 3001 are used during development.
229272
{FlagAllowedOrigins, "o", []string{"http://localhost:3284", "http://localhost:3000", "http://localhost:3001"}, "HTTP allowed origins. Use '*' for all, comma-separated list via flag, space-separated list via AGENTAPI_ALLOWED_ORIGINS env var", "stringSlice"},
230273
{FlagInitialPrompt, "I", "", "Initial prompt for the agent. Recommended only if the agent doesn't support initial prompt in interaction mode. Will be read from stdin if piped (e.g., echo 'prompt' | agentapi server -- my-agent)", "string"},
274+
{FlagExperimentalACP, "", false, "Use experimental ACP transport instead of PTY", "bool"},
231275
}
232276

233277
for _, spec := range flagSpecs {

lib/httpapi/models.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ func (m MessageType) Schema(r huma.Registry) *huma.Schema {
2525
return util.OpenAPISchema(r, "MessageType", MessageTypeValues)
2626
}
2727

28+
type Transport string
29+
30+
const (
31+
TransportPTY Transport = "pty"
32+
TransportACP Transport = "acp"
33+
)
34+
35+
var TransportValues = []Transport{
36+
TransportPTY,
37+
TransportACP,
38+
}
39+
40+
func (tr Transport) Schema(r huma.Registry) *huma.Schema {
41+
return util.OpenAPISchema(r, "Transport", TransportValues)
42+
}
43+
2844
// Message represents a message
2945
type Message struct {
3046
Id int `json:"id" doc:"Unique identifier for the message. This identifier also represents the order of the message in the conversation history."`
@@ -38,6 +54,7 @@ type StatusResponse struct {
3854
Body struct {
3955
Status AgentStatus `json:"status" doc:"Current agent status. 'running' means that the agent is processing a message, 'stable' means that the agent is idle and waiting for input."`
4056
AgentType mf.AgentType `json:"agent_type" doc:"Type of the agent being used by the server."`
57+
Transport Transport `json:"transport" doc:"Backend transport being used ('acp' or 'pty')."`
4158
}
4259
}
4360

lib/httpapi/server.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
mf "github.com/coder/agentapi/lib/msgfmt"
2525
st "github.com/coder/agentapi/lib/screentracker"
2626
"github.com/coder/agentapi/lib/termexec"
27+
"github.com/coder/agentapi/x/acpio"
2728
"github.com/coder/quartz"
2829
"github.com/danielgtaylor/huma/v2"
2930
"github.com/danielgtaylor/huma/v2/adapters/humachi"
@@ -42,12 +43,13 @@ type Server struct {
4243
mu sync.RWMutex
4344
logger *slog.Logger
4445
conversation st.Conversation
45-
agentio *termexec.Process
46+
agentio st.AgentIO
4647
agentType mf.AgentType
4748
emitter *EventEmitter
4849
chatBasePath string
4950
tempDir string
5051
clock quartz.Clock
52+
transport Transport
5153
}
5254

5355
func (s *Server) NormalizeSchema(schema any) any {
@@ -98,7 +100,8 @@ const snapshotInterval = 25 * time.Millisecond
98100

99101
type ServerConfig struct {
100102
AgentType mf.AgentType
101-
Process *termexec.Process
103+
AgentIO st.AgentIO
104+
Transport Transport
102105
Port int
103106
ChatBasePath string
104107
AllowedHosts []string
@@ -252,18 +255,32 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
252255
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
253256
}
254257

255-
conversation := st.NewPTY(ctx, st.PTYConversationConfig{
256-
AgentType: config.AgentType,
257-
AgentIO: config.Process,
258-
Clock: config.Clock,
259-
SnapshotInterval: snapshotInterval,
260-
ScreenStabilityLength: 2 * time.Second,
261-
FormatMessage: formatMessage,
262-
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
263-
FormatToolCall: formatToolCall,
264-
InitialPrompt: initialPrompt,
265-
Logger: logger,
266-
}, emitter)
258+
var conversation st.Conversation
259+
if config.Transport == TransportACP {
260+
// For ACP, cast AgentIO to *acpio.ACPAgentIO
261+
acpIO, ok := config.AgentIO.(*acpio.ACPAgentIO)
262+
if !ok {
263+
return nil, fmt.Errorf("ACP transport requires ACPAgentIO")
264+
}
265+
conversation = acpio.NewACPConversation(ctx, acpIO, logger, initialPrompt, emitter, config.Clock)
266+
} else {
267+
proc, ok := config.AgentIO.(*termexec.Process)
268+
if !ok && config.AgentIO != nil {
269+
return nil, fmt.Errorf("PTY transport requires termexec.Process")
270+
}
271+
conversation = st.NewPTY(ctx, st.PTYConversationConfig{
272+
AgentType: config.AgentType,
273+
AgentIO: proc,
274+
Clock: config.Clock,
275+
SnapshotInterval: snapshotInterval,
276+
ScreenStabilityLength: 2 * time.Second,
277+
FormatMessage: formatMessage,
278+
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
279+
FormatToolCall: formatToolCall,
280+
InitialPrompt: initialPrompt,
281+
Logger: logger,
282+
}, emitter)
283+
}
267284

268285
// Create temporary directory for uploads
269286
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
@@ -278,24 +295,25 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
278295
port: config.Port,
279296
conversation: conversation,
280297
logger: logger,
281-
agentio: config.Process,
298+
agentio: config.AgentIO,
282299
agentType: config.AgentType,
283300
emitter: emitter,
284301
chatBasePath: strings.TrimSuffix(config.ChatBasePath, "/"),
285302
tempDir: tempDir,
286303
clock: config.Clock,
304+
transport: config.Transport,
287305
}
288306

289307
// Register API routes
290308
s.registerRoutes()
291309

292-
// Start the conversation polling loop if we have a process.
293-
// Process is nil only when --print-openapi is used (no agent runs).
294-
// The process is already running at this point - termexec.StartProcess()
295-
// blocks until the PTY is created and the process is active. Agent
296-
// readiness (waiting for the prompt) is handled asynchronously inside
297-
// conversation.Start() via ReadyForInitialPrompt.
298-
if config.Process != nil {
310+
// Start the conversation polling loop if we have an agent IO.
311+
// AgentIO is nil only when --print-openapi is used (no agent runs).
312+
// For PTY transport, the process is already running at this point -
313+
// termexec.StartProcess() blocks until the PTY is created and the process
314+
// is active. Agent readiness (waiting for the prompt) is handled
315+
// asynchronously inside conversation.Start() via ReadyForInitialPrompt.
316+
if config.AgentIO != nil {
299317
s.conversation.Start(ctx)
300318
}
301319

@@ -417,6 +435,7 @@ func (s *Server) getStatus(ctx context.Context, input *struct{}) (*StatusRespons
417435
resp := &StatusResponse{}
418436
resp.Body.Status = agentStatus
419437
resp.Body.AgentType = s.agentType
438+
resp.Body.Transport = s.transport
420439

421440
return resp, nil
422441
}

0 commit comments

Comments
 (0)