Skip to content

Commit

Permalink
feat(websocket): wrap firefox web socket too
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelfeldman committed Mar 28, 2020
1 parent 6903496 commit ca4869c
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ export function guid(): string {

// 'onmessage' handler must be installed synchronously when 'onopen' callback is invoked to
// avoid missing incoming messages.
export async function connectToWebsocket<T>(url: string, onopen: (transport: ConnectionTransport) => Promise<T>): Promise<T> {
export async function connectToWebsocket<T>(url: string, onopen: (transport: ConnectionTransport) => Promise<T> | T): Promise<T> {
const transport = new WebSocketTransport(url);
return new Promise<T>((fulfill, reject) => {
transport._ws.addEventListener('open', async () => fulfill(await onopen(transport)));
Expand Down
7 changes: 1 addition & 6 deletions src/server/chromium.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ export class Chromium implements BrowserType<CRBrowser> {
const chromeExecutable = executablePath || this._executablePath;
if (!chromeExecutable)
throw new Error(`No executable path is specified. Pass "executablePath" option directly.`);
let browserServer: BrowserServer | undefined = undefined;
const { launchedProcess, gracefullyClose } = await launchProcess({
executablePath: chromeExecutable,
args: chromeArguments,
Expand Down Expand Up @@ -123,6 +122,7 @@ export class Chromium implements BrowserType<CRBrowser> {
});

let transport: PipeTransport | undefined = undefined;
let browserServer: BrowserServer | undefined = undefined;
transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close());
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? wrapTransportWithWebSocket(transport, port || 0) : null);
return { browserServer, transport };
Expand Down Expand Up @@ -251,11 +251,6 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number

socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest;
if (parsedMessage.method.startsWith('Backend')) {
// Add backend domain handler here.
return;
}

// If message has sessionId, pass through.
if (parsedMessage.sessionId) {
transport.send(parsedMessage);
Expand Down
138 changes: 134 additions & 4 deletions src/server/firefox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import * as ws from 'ws';
import { ConnectOptions, LaunchType } from '../browser';
import { BrowserContext } from '../browserContext';
import { TimeoutError } from '../errors';
Expand All @@ -29,6 +30,7 @@ import * as platform from '../platform';
import { BrowserServer } from './browserServer';
import { BrowserArgOptions, BrowserType, LaunchOptions } from './browserType';
import { launchProcess, waitForLine } from './processLauncher';
import { ConnectionTransport, ProtocolRequest, SequenceNumberMixer } from '../transport';

const mkdtempAsync = platform.promisify(fs.mkdtemp);

Expand Down Expand Up @@ -97,17 +99,16 @@ export class Firefox implements BrowserType<FFBrowser> {
}

if (!ignoreDefaultArgs)
firefoxArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0));
firefoxArguments.push(...this._defaultArgs(options, launchType, userDataDir!, 0));
else if (Array.isArray(ignoreDefaultArgs))
firefoxArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0).filter(arg => !ignoreDefaultArgs.includes(arg)));
firefoxArguments.push(...this._defaultArgs(options, launchType, userDataDir!, 0).filter(arg => !ignoreDefaultArgs.includes(arg)));
else
firefoxArguments.push(...args);

const firefoxExecutable = executablePath || this._executablePath;
if (!firefoxExecutable)
throw new Error(`No executable path is specified. Pass "executablePath" option directly.`);

let browserServer: BrowserServer | undefined = undefined;
const { launchedProcess, gracefullyClose } = await launchProcess({
executablePath: firefoxExecutable,
args: firefoxArguments,
Expand Down Expand Up @@ -140,7 +141,10 @@ export class Firefox implements BrowserType<FFBrowser> {

const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`);
const match = await waitForLine(launchedProcess, launchedProcess.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError);
const browserWSEndpoint = match[1];
const innerEndpoint = match[1];

let browserServer: BrowserServer | undefined = undefined;
const browserWSEndpoint = launchType === 'server' ? (await platform.connectToWebsocket(innerEndpoint, t => wrapTransportWithWebSocket(t, port || 0))) : innerEndpoint;
browserServer = new BrowserServer(launchedProcess, gracefullyClose, browserWSEndpoint);
return browserServer;
}
Expand Down Expand Up @@ -185,3 +189,129 @@ export class Firefox implements BrowserType<FFBrowser> {
}
}

function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string {
const server = new ws.Server({ port });
const guid = platform.guid();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();
const pendingBrowserContextCreations = new Set<number>();
const pendingBrowserContextDeletions = new Map<number, string>();
const browserContextIds = new Map<string, ws>();
const sessionIds = new Map<string, ws>();
const sockets = new Set<ws>();

transport.onmessage = message => {
if (typeof message.id === 'number') {
// Process command response.
const value = idMixer.take(message.id);
if (!value)
return;
const { id, socket } = value;

if (socket.readyState === ws.CLOSING) {
if (pendingBrowserContextCreations.has(id)) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Browser.removeBrowserContext',
params: { browserContextId: message.result.browserContextId }
});
}
return;
}

if (pendingBrowserContextCreations.has(message.id)) {
// Browser.createBrowserContext response -> establish context attribution.
browserContextIds.set(message.result.browserContextId, socket);
pendingBrowserContextCreations.delete(message.id);
}

const deletedContextId = pendingBrowserContextDeletions.get(message.id);
if (deletedContextId) {
// Browser.removeBrowserContext response -> remove context attribution.
browserContextIds.delete(deletedContextId);
pendingBrowserContextDeletions.delete(message.id);
}

message.id = id;
socket.send(JSON.stringify(message));
return;
}

// Process notification response.
const { method, params, sessionId } = message;
if (sessionId) {
const socket = sessionIds.get(sessionId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
socket.send(JSON.stringify(message));
return;
}
if (method === 'Browser.attachedToTarget') {
const socket = browserContextIds.get(params.targetInfo.browserContextId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
sessionIds.set(params.sessionId, socket);
socket.send(JSON.stringify(message));
return;
}
if (method === 'Browser.detachedFromTarget') {
const socket = sessionIds.get(params.sessionId);
sessionIds.delete(params.sessionId);
if (socket && socket.readyState !== ws.CLOSING)
socket.send(JSON.stringify(message));
return;
}
};

transport.onclose = () => {
for (const socket of sockets) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};

server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
sockets.add(socket);

socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString());
const { id, method, params, sessionId } = parsedMessage;
const seqNum = idMixer.generate({ id, socket });
transport.send({ ...parsedMessage, id: seqNum });
if (method === 'Browser.createBrowserContext')
pendingBrowserContextCreations.add(seqNum);
if (method === 'Browser.removeBrowserContext')
pendingBrowserContextDeletions.set(seqNum, params.browserContextId);
});

socket.on('close', (socket as any).__closeListener = () => {
for (const [browserContextId, s] of browserContextIds) {
if (s === socket) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Browser.removeBrowserContext',
params: { browserContextId }
});
browserContextIds.delete(browserContextId);
}
}
sockets.delete(socket);
});
});

const address = server.address();
if (typeof address === 'string')
return address + '/' + guid;
return 'ws://127.0.0.1:' + address.port + '/' + guid;
}

39 changes: 11 additions & 28 deletions src/server/webkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import * as os from 'os';
import { helper } from '../helper';
import { kBrowserCloseMessageId } from '../webkit/wkConnection';
import { LaunchOptions, BrowserArgOptions, BrowserType } from './browserType';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SequenceNumberMixer } from '../transport';
import * as ws from 'ws';
import { ConnectOptions, LaunchType } from '../browser';
import { BrowserServer } from './browserServer';
Expand Down Expand Up @@ -163,23 +163,6 @@ const mkdtempAsync = platform.promisify(fs.mkdtemp);

const WEBKIT_PROFILE_PATH = path.join(os.tmpdir(), 'playwright_dev_profile-');

class SequenceNumberMixer<V> {
static _lastSequenceNumber = 1;
private _values = new Map<number, V>();

generate(value: V): number {
const sequenceNumber = ++SequenceNumberMixer._lastSequenceNumber;
this._values.set(sequenceNumber, value);
return sequenceNumber;
}

take(sequenceNumber: number): V | undefined {
const value = this._values.get(sequenceNumber);
this._values.delete(sequenceNumber);
return value;
}
}

function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string {
const server = new ws.Server({ port });
const guid = platform.guid();
Expand Down Expand Up @@ -265,6 +248,16 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number
}
};

transport.onclose = () => {
for (const socket of sockets) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};

server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
Expand Down Expand Up @@ -302,16 +295,6 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number
});
});

transport.onclose = () => {
for (const socket of sockets) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};

const address = server.address();
if (typeof address === 'string')
return address + '/' + guid;
Expand Down
17 changes: 17 additions & 0 deletions src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,20 @@ export class DeferWriteTransport implements ConnectionTransport {
this._delegate.close();
}
}

export class SequenceNumberMixer<V> {
static _lastSequenceNumber = 1;
private _values = new Map<number, V>();

generate(value: V): number {
const sequenceNumber = ++SequenceNumberMixer._lastSequenceNumber;
this._values.set(sequenceNumber, value);
return sequenceNumber;
}

take(sequenceNumber: number): V | undefined {
const value = this._values.get(sequenceNumber);
this._values.delete(sequenceNumber);
return value;
}
}

0 comments on commit ca4869c

Please sign in to comment.