Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions cmd/picoclaw/cmd_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/cron"
"github.com/sipeed/picoclaw/pkg/devices"
"github.com/sipeed/picoclaw/pkg/gateway"
"github.com/sipeed/picoclaw/pkg/health"
"github.com/sipeed/picoclaw/pkg/heartbeat"
"github.com/sipeed/picoclaw/pkg/logger"
Expand All @@ -29,14 +30,25 @@ import (
)

func gatewayCmd() {
// Check for --debug flag
args := os.Args[2:]
for _, arg := range args {
if arg == "--help" || arg == "-h" {
gatewayHelp()
os.Exit(0)
}
}

// Parse flags: default WebSocket on; --no-websocket for health-only
enableWebSocket := true
for _, arg := range args {
if arg == "--debug" || arg == "-d" {
logger.SetLevel(logger.DEBUG)
fmt.Println("πŸ” Debug mode enabled")
break
}
if arg == "--no-websocket" {
enableWebSocket = false
}
// --enable-websocket is a no-op (WebSocket is on by default)
}

cfg, err := loadConfig()
Expand Down Expand Up @@ -196,13 +208,24 @@ func gatewayCmd() {
fmt.Printf("Error starting channels: %v\n", err)
}

healthServer := health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
go func() {
if err := healthServer.Start(); err != nil && err != http.ErrServerClosed {
logger.ErrorCF("health", "Health server error", map[string]any{"error": err.Error()})
}
}()
fmt.Printf("βœ“ Health endpoints available at http://%s:%d/health and /ready\n", cfg.Gateway.Host, cfg.Gateway.Port)
var healthServer *health.Server
if enableWebSocket {
gw := gateway.NewServer(&cfg.Gateway, agentLoop.GetRegistry(), msgBus)
go func() {
if err := gw.Start(ctx); err != nil && err != http.ErrServerClosed {
logger.ErrorCF("gateway", "Gateway server error", map[string]any{"error": err.Error()})
}
}()
fmt.Printf("βœ“ WebSocket Gateway and health at http://%s:%d (/, /health, /ready)\n", cfg.Gateway.Host, cfg.Gateway.Port)
} else {
healthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
go func() {
if err := healthServer.Start(); err != nil && err != http.ErrServerClosed {
logger.ErrorCF("health", "Health server error", map[string]any{"error": err.Error()})
}
}()
fmt.Printf("βœ“ Health endpoints available at http://%s:%d/health and /ready\n", cfg.Gateway.Host, cfg.Gateway.Port)
}

go agentLoop.Run(ctx)

Expand All @@ -215,7 +238,9 @@ func gatewayCmd() {
cp.Close()
}
cancel()
healthServer.Stop(context.Background())
if healthServer != nil {
healthServer.Stop(context.Background())
}
deviceService.Stop()
heartbeatService.Stop()
cronService.Stop()
Expand All @@ -224,6 +249,20 @@ func gatewayCmd() {
fmt.Println("βœ“ Gateway stopped")
}

func gatewayHelp() {
fmt.Println("\nStart the PicoClaw gateway (channels, agent, health). WebSocket gateway is enabled by default.")
fmt.Println()
fmt.Println("Usage: picoclaw gateway [options]")
fmt.Println()
fmt.Println("Options:")
fmt.Println(" -d, --debug Enable debug logging")
fmt.Println(" --no-websocket Only serve /health and /ready (no WebSocket gateway)")
fmt.Println()
fmt.Println("Examples:")
fmt.Println(" picoclaw gateway Start with WebSocket gateway and health endpoints")
fmt.Println(" picoclaw gateway --no-websocket Health endpoints only")
}

func setupCronTool(
agentLoop *agent.AgentLoop,
msgBus *bus.MessageBus,
Expand Down
4 changes: 3 additions & 1 deletion config/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@
},
"gateway": {
"host": "127.0.0.1",
"port": 18790
"port": 18790,
"token": "",
"password": ""
}
}
27 changes: 19 additions & 8 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ func (al *AgentLoop) Run(ctx context.Context) error {
response = fmt.Sprintf("Error processing message: %v", err)
}

if response != "" {
if response != "" && msg.Channel != "web" {
// Web channel already got final response in processMessage (state "final"); only publish for other channels.
// Check if the message tool already sent a response during this round.
// If so, skip publishing to avoid duplicate messages to the user.
// Use default agent's tools to check (message tool is shared).
alreadySent := false
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent != nil {
Expand All @@ -186,7 +185,6 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}
}
}

if !alreadySent {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
Expand All @@ -201,6 +199,11 @@ func (al *AgentLoop) Run(ctx context.Context) error {
return nil
}

// GetRegistry returns the agent registry for gateway/session resolution.
func (al *AgentLoop) GetRegistry() *AgentRegistry {
return al.registry
}

func (al *AgentLoop) Stop() {
al.running.Store(false)
}
Expand Down Expand Up @@ -324,14 +327,15 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
"matched_by": route.MatchedBy,
})

sendResponse := msg.Channel == "web"
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: msg.Channel,
ChatID: msg.ChatID,
UserMessage: msg.Content,
DefaultResponse: "I've completed processing but have no response to give.",
EnableSummary: true,
SendResponse: false,
SendResponse: sendResponse,
})
}

Expand Down Expand Up @@ -449,13 +453,20 @@ func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opt
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
}

// 8. Optional: send response via bus
// 8. Optional: send response via bus (e.g. web gateway for WebClaw)
if opts.SendResponse {
al.bus.PublishOutbound(bus.OutboundMessage{
out := bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: finalContent,
})
State: "final",
}
if opts.Channel == "web" {
if idx := strings.LastIndex(opts.ChatID, "|"); idx >= 0 && idx < len(opts.ChatID)-1 {
out.RunID = opts.ChatID[idx+1:]
}
}
al.bus.PublishOutbound(out)
}

// 9. Log response
Expand Down
4 changes: 4 additions & 0 deletions pkg/bus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ type InboundMessage struct {
Metadata map[string]string `json:"metadata,omitempty"`
}

// OutboundMessage is sent from the agent loop to channels (e.g. web gateway).
// State and RunID support streaming: "streaming" for chunks, "final" for complete reply.
type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
State string `json:"state,omitempty"` // e.g. "streaming", "final"
RunID string `json:"run_id,omitempty"` // idempotency/run identifier for the turn
}

type MessageHandler func(InboundMessage) error
13 changes: 11 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,18 @@ func (c *ModelConfig) Validate() error {
return nil
}

// WebSessionAgentBinding maps a session key prefix to an agent ID for web channel.
type WebSessionAgentBinding struct {
SessionKeyPrefix string `json:"session_key_prefix"`
AgentID string `json:"agent_id"`
}

type GatewayConfig struct {
Host string `json:"host" env:"PICOCLAW_GATEWAY_HOST"`
Port int `json:"port" env:"PICOCLAW_GATEWAY_PORT"`
Host string `json:"host" env:"PICOCLAW_GATEWAY_HOST"`
Port int `json:"port" env:"PICOCLAW_GATEWAY_PORT"`
Token string `json:"token" env:"PICOCLAW_GATEWAY_TOKEN"`
Password string `json:"password" env:"PICOCLAW_GATEWAY_PASSWORD"`
WebSessionAgentBindings []WebSessionAgentBinding `json:"web_session_agent_bindings,omitempty"`
}

type BraveConfig struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ func DefaultConfig() *Config {
},
},
Gateway: GatewayConfig{
Host: "127.0.0.1",
Port: 18790,
Host: "127.0.0.1",
Port: 18790,
Token: "",
Password: "",
},
Tools: ToolsConfig{
Web: WebToolsConfig{
Expand Down
81 changes: 81 additions & 0 deletions pkg/gateway/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package gateway

import (
"crypto/sha1"
"fmt"
"time"

"github.com/sipeed/picoclaw/pkg/providers"
)

// messageToGateway converts a providers.Message to a WebClaw GatewayMessage (as map for JSON).
// Order of messages is preserved by the caller (session history order).
func messageToGateway(m providers.Message, index int, baseTime int64) map[string]any {
out := map[string]any{
"role": m.Role,
}
if baseTime > 0 {
out["createdAt"] = baseTime + int64(index)*1000
}
// Generate a stable id for the message.
id := msgID(m, index)
out["id"] = id

switch m.Role {
case "user":
out["content"] = []map[string]any{
{"type": "text", "text": m.Content},
}
case "assistant":
var content []map[string]any
if m.Content != "" {
content = append(content, map[string]any{"type": "text", "text": m.Content})
}
for _, tc := range m.ToolCalls {
content = append(content, map[string]any{
"type": "toolCall",
"id": tc.ID,
"name": tc.Name,
"arguments": tc.Arguments,
})
}
if len(content) == 0 {
content = []map[string]any{{"type": "text", "text": ""}}
}
out["content"] = content
case "tool":
// WebClaw expects role "toolResult" for tool results.
out["role"] = "toolResult"
out["toolCallId"] = m.ToolCallID
out["content"] = []map[string]any{
{"type": "text", "text": m.Content},
}
default:
out["content"] = []map[string]any{
{"type": "text", "text": m.Content},
}
}
return out
}

func msgID(m providers.Message, index int) string {
h := sha1.New()
h.Write([]byte(m.Role + m.Content + m.ToolCallID))
for _, tc := range m.ToolCalls {
h.Write([]byte(tc.ID + tc.Name))
}
return fmt.Sprintf("msg-%x-%d", h.Sum(nil)[:8], index)
}

// historyToGatewayMessages converts session history to WebClaw messages array.
// baseTime is used for createdAt when not stored (e.g. 2026-01-01 00:00:00 UTC ms).
func historyToGatewayMessages(history []providers.Message, baseTime int64) []map[string]any {
if baseTime == 0 {
baseTime = time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
}
out := make([]map[string]any, 0, len(history))
for i := range history {
out = append(out, messageToGateway(history[i], i, baseTime))
}
return out
}
Loading