Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/* \
&& ln -sf /bin/bash /bin/sh

# Install cloudflared for multiple architectures
RUN ARCH=$(dpkg --print-architecture) && \
case ${ARCH} in \
amd64) CF_ARCH="amd64" ;; \
arm64) CF_ARCH="arm64" ;; \
armhf) CF_ARCH="arm" ;; \
*) CF_ARCH="amd64" ;; \
esac && \
curl -L "https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-${CF_ARCH}" -o /usr/local/bin/cloudflared && \
chmod +x /usr/local/bin/cloudflared

# Create a non-root user with home directory and sudo access
RUN addgroup --gid 1001 claude && \
adduser --system --uid 1001 --ingroup claude --home /home/claude claude && \
Expand Down
6 changes: 4 additions & 2 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
},
"workspaces": [
"packages/dispatcher",
"packages/orchestrator",
"packages/orchestrator",
"packages/shared",
"packages/worker"
],
Expand All @@ -32,7 +32,7 @@
"dependencies": {
"@google-cloud/storage": "^7.14.0",
"@kubernetes/client-node": "^1.3.0",
"@modelcontextprotocol/sdk": "^1.11.0",
"@modelcontextprotocol/sdk": "^1.17.4",
"@octokit/graphql": "^8.2.2",
"@octokit/rest": "^21.1.1",
"@octokit/webhooks-types": "^7.6.1",
Expand Down
199 changes: 160 additions & 39 deletions packages/worker/mcp/process-manager-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ interface ProcessInfo {
exitCode?: number;
restartCount: number;
process?: ChildProcess;
port?: number;
tunnelUrl?: string;
tunnelProcess?: ChildProcess;
}

class ProcessManager {
private processes: Map<string, ProcessInfo> = new Map();
private processDir = "/tmp/agent-processes";
private logsDir = "/tmp/claude-logs";
private monitorInterval?: NodeJS.Timeout;
private autoRestart = false;
private autoRestart = true; // Enabled by default

constructor() {
this.init();
// Start monitoring by default with 30 second interval
this.startMonitoring(30000);
}

private async init() {
Expand Down Expand Up @@ -65,7 +70,7 @@ class ProcessManager {
return path.join(this.logsDir, `${id}.log`);
}

async startProcess(id: string, command: string, description: string): Promise<ProcessInfo> {
async startProcess(id: string, command: string, description: string, port?: number): Promise<ProcessInfo> {
if (this.processes.has(id)) {
const existing = this.processes.get(id)!;
if (existing.status === "running" && existing.pid) {
Expand All @@ -80,6 +85,7 @@ class ProcessManager {
status: "starting",
startedAt: new Date().toISOString(),
restartCount: 0,
port,
};

const logPath = this.getLogPath(id);
Expand All @@ -91,6 +97,10 @@ class ProcessManager {
logStream.write(`Command: ${command}\n`);
logStream.write(`Description: ${description}\n`);
logStream.write("---\n");

// Log to worker console
console.log(`[Process Manager] Starting process ${id}: ${description}`);
console.log(`[Process Manager] Command: ${command}`);

const child = spawn("bash", ["-c", command], {
detached: false,
Expand All @@ -103,10 +113,14 @@ class ProcessManager {

child.stdout?.on("data", (data) => {
logStream.write(data);
// Also pipe to worker stdout with process identifier
process.stdout.write(`[Process ${id}] ${data}`);
});

child.stderr?.on("data", (data) => {
logStream.write(data);
// Also pipe to worker stderr with process identifier
process.stderr.write(`[Process ${id}] ${data}`);
});

child.on("exit", async (code, signal) => {
Expand All @@ -117,21 +131,118 @@ class ProcessManager {

logStream.write(`\nProcess ${id} exited with code ${code} at ${info.completedAt}\n`);
logStream.end();

// Log to worker console
console.log(`[Process Manager] Process ${id} exited with code ${code}`);

await this.saveProcessInfo(info);

if (this.autoRestart && info.status === "failed" && info.restartCount < 5) {
console.error(`Process ${id} failed, attempting restart...`);
console.error(`[Process Manager] Process ${id} failed, attempting restart...`);
setTimeout(() => this.restartProcess(id), 5000);
}
});

this.processes.set(id, info);
await this.saveProcessInfo(info);

// Start cloudflared tunnel if port is specified
if (port) {
this.startTunnel(id, port);
}

return info;
}

private async startTunnel(id: string, port: number): Promise<void> {
const info = this.processes.get(id);
if (!info) return;

const tunnelLogPath = path.join(this.logsDir, `${id}-tunnel.log`);
const tunnelLogStream = await import("fs").then(fs =>
fs.createWriteStream(tunnelLogPath, { flags: "a" })
);

tunnelLogStream.write(`Starting cloudflared tunnel for port ${port} at ${new Date().toISOString()}\n`);

// Log to worker console
console.log(`[Process Manager] Starting cloudflared tunnel for process ${id} on port ${port}`);

const tunnelChild = spawn("cloudflared", ["tunnel", "--url", `http://localhost:${port}`], {
detached: false,
stdio: ["ignore", "pipe", "pipe"],
});

info.tunnelProcess = tunnelChild;

// Extract the tunnel URL from cloudflared output
let urlExtracted = false;
const extractTimeout = setTimeout(() => {
if (!urlExtracted) {
tunnelLogStream.write("Failed to extract tunnel URL within 15 seconds\n");
console.error(`Failed to extract tunnel URL for process ${id}`);

// Kill the tunnel process if URL extraction fails
if (info.tunnelProcess) {
try {
process.kill(info.tunnelProcess.pid!, "SIGTERM");
} catch (e) {
// Process already terminated
}
delete info.tunnelProcess;
info.tunnelUrl = undefined;
}
}
}, 15000);

const extractUrl = (data: Buffer) => {
const output = data.toString();
tunnelLogStream.write(output);

// Also pipe tunnel logs to worker stderr with identifier
process.stderr.write(`[Tunnel ${id}] ${output}`);

// Look for the trycloudflare.com URL in the output
const urlMatch = output.match(/https?:\/\/([a-z0-9-]+)\.trycloudflare\.com/);
if (urlMatch && !urlExtracted) {
urlExtracted = true;
clearTimeout(extractTimeout);
const prefix = urlMatch[1];
info.tunnelUrl = `https://${prefix}.peerbot.ai`;
console.log(`[Tunnel ${id}] Established: ${info.tunnelUrl}`);
this.saveProcessInfo(info);
}
};

tunnelChild.stdout?.on("data", extractUrl);
tunnelChild.stderr?.on("data", extractUrl);

tunnelChild.on("exit", (code, signal) => {
clearTimeout(extractTimeout);
tunnelLogStream.write(`\nTunnel process exited with code ${code} at ${new Date().toISOString()}\n`);
tunnelLogStream.end();

if (info.tunnelProcess === tunnelChild) {
delete info.tunnelProcess;
info.tunnelUrl = undefined;
this.saveProcessInfo(info);
}
});

tunnelChild.on("error", (error) => {
clearTimeout(extractTimeout);
tunnelLogStream.write(`Tunnel process error: ${error.message}\n`);
console.error(`Failed to start cloudflared tunnel for process ${id}:`, error);

if (!urlExtracted) {
urlExtracted = true;
info.tunnelUrl = undefined;
delete info.tunnelProcess;
this.saveProcessInfo(info);
}
});
}

async stopProcess(id: string): Promise<void> {
const info = this.processes.get(id);
if (!info) {
Expand All @@ -143,6 +254,17 @@ class ProcessManager {
}

try {
// Stop the tunnel process if it exists
if (info.tunnelProcess && info.tunnelProcess.pid) {
try {
process.kill(info.tunnelProcess.pid, "SIGTERM");
} catch (e) {
// Tunnel process already terminated
}
delete info.tunnelProcess;
info.tunnelUrl = undefined;
}

process.kill(info.pid, "SIGTERM");

// Give process time to terminate gracefully
Expand Down Expand Up @@ -177,7 +299,7 @@ class ProcessManager {
}

info.restartCount++;
return this.startProcess(id, info.command, info.description);
return this.startProcess(id, info.command, info.description, info.port);
}

getStatus(id?: string): ProcessInfo | ProcessInfo[] | null {
Expand Down Expand Up @@ -254,15 +376,43 @@ const server = new McpServer({
// Register tools
server.tool(
"start_process",
"Start a background process with monitoring",
"Start a background process with monitoring and optional tunnel",
{
id: z.string().describe("Unique identifier for the process"),
command: z.string().describe("Command to execute"),
description: z.string().describe("Description of what this process does"),
port: z.number().optional().describe("Optional port to expose via cloudflared tunnel"),
},
async ({ id, command, description }) => {
async ({ id, command, description, port }) => {
try {
const info = await manager.startProcess(id, command, description);
const info = await manager.startProcess(id, command, description, port);

// If port is specified, wait a bit for tunnel URL to be extracted
if (port) {
await new Promise(resolve => setTimeout(resolve, 3000));
const updatedInfo = manager.getStatus(id) as ProcessInfo | null;
if (updatedInfo?.tunnelUrl) {
return {
content: [
{
type: "text",
text: `Started process ${id} (PID: ${info.pid})\nTunnel URL: ${updatedInfo.tunnelUrl}`,
},
],
};
} else if (port) {
// Tunnel failed, suggest using ngrok
return {
content: [
{
type: "text",
text: `Started process ${id} (PID: ${info.pid})\nWarning: Failed to establish cloudflared tunnel. Consider using ngrok as an alternative.`,
},
],
};
}
}

return {
content: [
{
Expand Down Expand Up @@ -375,7 +525,7 @@ server.tool(
Description: ${p.description}
Started: ${p.startedAt}${p.completedAt ? `\n Completed: ${p.completedAt}` : ""}${
p.exitCode !== undefined ? `\n Exit code: ${p.exitCode}` : ""
}
}${p.port ? `\n Port: ${p.port}` : ""}${p.tunnelUrl ? `\n Tunnel URL: ${p.tunnelUrl}` : ""}
Restart count: ${p.restartCount}`
)
.join("\n\n");
Expand Down Expand Up @@ -411,37 +561,8 @@ server.tool(
}
);

server.tool(
"monitor_processes",
"Enable or disable automatic process monitoring and restart",
{
enabled: z.boolean().describe("Enable or disable monitoring"),
interval: z.number().optional().default(30000).describe("Check interval in milliseconds"),
},
async ({ enabled, interval }) => {
if (enabled) {
manager.startMonitoring(interval);
return {
content: [
{
type: "text",
text: `Process monitoring enabled (interval: ${interval}ms)`,
},
],
};
} else {
manager.stopMonitoring();
return {
content: [
{
type: "text",
text: "Process monitoring disabled",
},
],
};
}
}
);
// Process monitoring is enabled by default with 30 second interval
// It will automatically restart failed processes up to 5 times

// Register resources
server.resource(
Expand Down
Loading