Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 200 additions & 93 deletions packages/insomnia/src/main/network/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'node:fs';
import path from 'node:path';

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { getDefaultEnvironment, StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import {
type ClientRequest,
Expand All @@ -19,6 +19,7 @@ import { getAppVersion, getProductName } from '~/common/constants';
import { getMcpMethodFromMessage } from '~/common/mcp-utils';
import { generateId } from '~/common/misc';
import * as models from '~/models';
import { TRANSPORT_TYPES, type TransportType } from '~/models/mcp-request';
import type { McpResponse } from '~/models/mcp-response';
import type { RequestAuthentication, RequestHeader } from '~/models/request';
import { getBasicAuthHeader } from '~/network/basic-auth/get-header';
Expand All @@ -37,21 +38,22 @@ interface CommonMcpOptions {
type OpenMcpHTTPClientConnectionOptions = CommonMcpOptions & {
workspaceId: string;
url: string;
transportType: 'streamable-http';
transportType: typeof TRANSPORT_TYPES.HTTP;
headers: RequestHeader[];
authentication: RequestAuthentication;
};
type OpenMcpStdioClientConnectionOptions = CommonMcpOptions & {
workspaceId: string;
// TODO: should rename to command or urlOrCommand
url: string;
transportType: 'stdio';
transportType: typeof TRANSPORT_TYPES.STDIO;
env: Record<string, string>;
};
export type OpenMcpClientConnectionOptions = OpenMcpHTTPClientConnectionOptions | OpenMcpStdioClientConnectionOptions;
const isOpenMcpHTTPClientConnectionOptions = (
options: OpenMcpClientConnectionOptions,
): options is OpenMcpHTTPClientConnectionOptions => {
return options.transportType === 'streamable-http';
return options.transportType === TRANSPORT_TYPES.HTTP;
};
export interface McpRequestOptions {
requestId: string;
Expand Down Expand Up @@ -205,7 +207,8 @@ const createErrorResponse = async ({
environmentId,
timelinePath,
message,
}: ResponseEventOptions & { message: string }) => {
transportType,
}: ResponseEventOptions & { message: string; transportType: TransportType }) => {
const settings = await models.settings.get();
const responsePatch = {
_id: responseId,
Expand All @@ -214,6 +217,7 @@ const createErrorResponse = async ({
timelinePath,
statusMessage: 'Error',
error: message,
transportType,
};
const res = await models.mcpResponse.create(responsePatch, settings.maxHistoryResponses);
models.requestMeta.updateOrCreateByParentId(requestId, { activeResponseId: res._id });
Expand Down Expand Up @@ -284,6 +288,7 @@ const fetchWithLogging = async (
elapsedTime: performance.now() - start,
timelinePath,
eventLogPath,
transportType: TRANSPORT_TYPES.HTTP,
};
const settings = await models.settings.get();
const res = await models.mcpResponse.create(responsePatch, settings.maxHistoryResponses);
Expand All @@ -305,6 +310,185 @@ const fetchWithLogging = async (
return response;
};

const createStreamableHTTPTransport = (
options: OpenMcpHTTPClientConnectionOptions,
{
responseId,
responseEnvironmentId,
timelinePath,
eventLogPath,
}: {
responseId: string;
responseEnvironmentId: string | null;
timelinePath: string;
eventLogPath: string;
},
) => {
const { url, requestId } = options;
if (!url) {
throw new Error('MCP server url is required');
}

if (!options.authentication.disabled) {
if (options.authentication.type === 'basic') {
const { username, password, useISO88591 } = options.authentication;
const encoding = useISO88591 ? 'latin1' : 'utf8';
options.headers.push(getBasicAuthHeader(username, password, encoding));
}
if (options.authentication.type === 'apikey') {
const { key = '', value = '' } = options.authentication;
options.headers.push({ name: key, value: value });
}
if (options.authentication.type === 'bearer' && options.authentication.token) {
const { token, prefix } = options.authentication;
options.headers.push(getBearerAuthHeader(token, prefix));
}
}
const reduceArrayToLowerCaseKeyedDictionary = (acc: Record<string, string>, { name, value }: RequestHeader) => ({
...acc,
[name.toLowerCase() || '']: value || '',
});
const lowerCasedEnabledHeaders = options.headers
.filter(({ name, disabled }) => Boolean(name) && !disabled)
.reduce(reduceArrayToLowerCaseKeyedDictionary, {});

const mcpServerUrl = new URL(url);
const transport = new StreamableHTTPClientTransport(mcpServerUrl, {
requestInit: {
headers: lowerCasedEnabledHeaders,
},
fetch: (url, init) =>
fetchWithLogging(url, init || {}, {
requestId,
responseId,
environmentId: responseEnvironmentId,
timelinePath,
eventLogPath,
}),
reconnectionOptions: {
maxReconnectionDelay: 30000,
initialReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1.5,
maxRetries: 2,
},
});
transport.onmessage = message => _handleMcpMessage(message, requestId);
return transport;
};

const createStdioTransport = (
options: OpenMcpStdioClientConnectionOptions,
{
responseId,
responseEnvironmentId,
timelinePath,
eventLogPath,
}: {
responseId: string;
responseEnvironmentId: string | null;
timelinePath: string;
eventLogPath: string;
},
) => {
const { url, requestId, env } = options;
const parseResult = parse(url);
if (parseResult.find(arg => typeof arg !== 'string')) {
throw new Error('Invalid command format');
}
const [command, ...args] = parseResult as string[];

const initialTimelines = getInitialTimeline(`STDIO: ${url}`);
// Add stdio-specific timeline info
initialTimelines.push({
value: `Run command: ${url}`,
name: 'HeaderOut',
timestamp: Date.now(),
});
const stringifiedEnv = Object.entries(env)
.map(([key, value]) => `${key}=${value}`)
.join(' ')
.trim();
if (stringifiedEnv) {
initialTimelines.push({
value: `With env: ${stringifiedEnv}`,
name: 'HeaderOut',
timestamp: Date.now(),
});
}
initialTimelines.map(t => timelineFileStreams.get(requestId)?.write(JSON.stringify(t) + '\n'));

const start = performance.now();
const transport = new StdioClientTransport({
command,
args,
env: {
...getDefaultEnvironment(),
...env,
},
stderr: 'pipe',
});

// Capture stderr logs for debugging
const stderrStream = transport.stderr;
stderrStream?.on('data', (chunk: Buffer) => {
const stderrData = chunk.toString().trim();
if (!stderrData) return; // Skip empty lines

// Log stderr output to timeline with appropriate categorization
timelineFileStreams.get(requestId)?.write(
JSON.stringify({
value: stderrData,
name: 'HeaderIn',
timestamp: Date.now(),
}) + '\n',
);
});

// Wrap the original send method to log outgoing requests for stdio transport
const originalSend = transport.send.bind(transport);
transport.send = async (message: JSONRPCMessage & { method: string }) => {
const method = message.method || 'unknown';

// Create response model for initialize message and add process status timeline
if (method === 'initialize') {
// Add process started timeline (similar to HTTP response timeline)
timelineFileStreams
.get(requestId)
?.write(JSON.stringify({ value: 'Process started and ready', name: 'Text', timestamp: Date.now() }) + '\n');

const responsePatch: Partial<McpResponse> = {
_id: responseId,
parentId: requestId,
environmentId: responseEnvironmentId,
url,
elapsedTime: performance.now() - start,
timelinePath,
eventLogPath,
transportType: TRANSPORT_TYPES.STDIO,
};
const settings = await models.settings.get();
const res = await models.mcpResponse.create(responsePatch, settings.maxHistoryResponses);
models.requestMeta.updateOrCreateByParentId(requestId, { activeResponseId: res._id });
}

const requestEvent: McpRequestEvent = {
_id: mcpEventIdGenerator(),
method,
requestId,
type: 'message',
direction: 'OUTGOING',
timestamp: Date.now(),
data: message,
};
eventLogFileStreams.get(requestId)?.write(JSON.stringify(requestEvent) + '\n');

return originalSend(message);
};

transport.onmessage = message => _handleMcpMessage(message, requestId);
return transport;
};

const openMcpClientConnection = async (options: OpenMcpClientConnectionOptions) => {
const { requestId, workspaceId } = options;

Expand Down Expand Up @@ -332,99 +516,20 @@ const openMcpClientConnection = async (options: OpenMcpClientConnectionOptions)
mcpClient.onerror = _error => _handleMcpConnectionError(requestId, _error);
const mcpStateChannel = getMcpStateChannel(requestId);

const createStreamableHTTPTransport = (options: OpenMcpHTTPClientConnectionOptions) => {
const { url, requestId } = options;
if (!url) {
throw new Error('MCP server url is required');
}

if (!options.authentication.disabled) {
if (options.authentication.type === 'basic') {
const { username, password, useISO88591 } = options.authentication;
const encoding = useISO88591 ? 'latin1' : 'utf8';
options.headers.push(getBasicAuthHeader(username, password, encoding));
}
if (options.authentication.type === 'apikey') {
const { key = '', value = '' } = options.authentication;
options.headers.push({ name: key, value: value });
}
if (options.authentication.type === 'bearer' && options.authentication.token) {
const { token, prefix } = options.authentication;
options.headers.push(getBearerAuthHeader(token, prefix));
}
}
const reduceArrayToLowerCaseKeyedDictionary = (acc: Record<string, string>, { name, value }: RequestHeader) => ({
...acc,
[name.toLowerCase() || '']: value || '',
});
const lowerCasedEnabledHeaders = options.headers
.filter(({ name, disabled }) => Boolean(name) && !disabled)
.reduce(reduceArrayToLowerCaseKeyedDictionary, {});

const mcpServerUrl = new URL(url);
const transport = new StreamableHTTPClientTransport(mcpServerUrl, {
requestInit: {
headers: lowerCasedEnabledHeaders,
},
fetch: (url, init) =>
fetchWithLogging(url, init || {}, {
requestId,
try {
const transport = isOpenMcpHTTPClientConnectionOptions(options)
? await createStreamableHTTPTransport(options, {
responseId,
environmentId: responseEnvironmentId,
responseEnvironmentId,
timelinePath,
eventLogPath,
}),
reconnectionOptions: {
maxReconnectionDelay: 30000,
initialReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1.5,
maxRetries: 2,
},
});
transport.onmessage = message => _handleMcpMessage(message, requestId);
return transport;
};

const createStdioTransport = (options: OpenMcpClientConnectionOptions) => {
const { url } = options;
const parseResult = parse(url);
if (parseResult.find(arg => typeof arg !== 'string')) {
throw new Error('Invalid command format');
}
const [command, ...args] = parseResult as string[];
const transport = new StdioClientTransport({
command,
args,
});
transport.onmessage = async message => {
const method = getMcpMethodFromMessage(message);

// TODO[MCP-STDIO]: try to find a better way to unify this with the http transport
if (method === 'initialize') {
const responsePatch: Partial<McpResponse> = {
_id: responseId,
parentId: requestId,
environmentId: responseEnvironmentId,
url: url.toString(),
// elapsedTime: performance.now() - start,
})
: await createStdioTransport(options, {
responseId,
responseEnvironmentId,
timelinePath,
eventLogPath,
};
const settings = await models.settings.get();
const res = await models.mcpResponse.create(responsePatch, settings.maxHistoryResponses);
models.requestMeta.updateOrCreateByParentId(requestId, { activeResponseId: res._id });
}

_handleMcpMessage(message, requestId);
};
return transport;
};

try {
const transport = isOpenMcpHTTPClientConnectionOptions(options)
? await createStreamableHTTPTransport(options)
: await createStdioTransport(options);

});
await mcpClient.connect(transport!);
} catch (error) {
// Log error when connection fails with exception
Expand All @@ -435,10 +540,12 @@ const openMcpClientConnection = async (options: OpenMcpClientConnectionOptions)
timelinePath,
eventLogPath,
message: error.message || 'Something went wrong',
transportType: options.transportType,
});
console.error(`Failed to create ${options.transportType} transport: ${error}`);
return;
}

mcpConnections.set(requestId, mcpClient as McpClient);
const serverCapabilities = mcpClient.getServerCapabilities();
const primitivePromises: Promise<any>[] = [];
Expand Down
Loading
Loading