Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] create NodeWebSocketFactory, refactor code, new tests #1331

Merged
merged 2 commits into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions libraries/botbuilder/src/botFrameworkAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@

import { Activity, ActivityTypes, BotAdapter, BotCallbackHandlerKey, ChannelAccount, ConversationAccount, ConversationParameters, ConversationReference, ConversationsResult, IUserTokenProvider, ResourceResponse, TokenResponse, TurnContext } from 'botbuilder-core';
import { AuthenticationConstants, ChannelValidation, ConnectorClient, EmulatorApiClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, SimpleCredentialProvider, TokenApiClient, TokenStatus, TokenApiModels } from 'botframework-connector';
import { IncomingMessage } from 'http';
import * as os from 'os';
import { TokenResolver } from './tokenResolver';
import { IStreamingTransportServer, IReceiveRequest, StreamingResponse, NamedPipeServer, ISocket, WebSocketServer, NodeWebSocket } from 'botframework-streaming';
import { Watershed } from 'watershed';
import {
IReceiveRequest,
ISocket,
IStreamingTransportServer,
NamedPipeServer,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
StreamingResponse,
WebSocketServer,
} from 'botframework-streaming';
import { StreamingHttpClient } from './streamingHttpClient';

export enum StatusCodes {
Expand Down Expand Up @@ -137,6 +146,11 @@ export interface BotFrameworkAdapterSettings {
* Optional. The option to determine if this adapter accepts WebSocket connections
*/
enableWebSockets?: boolean;

/**
* Optional. Used to pass in a NodeWebSocketFactoryBase instance. Allows bot to accept WebSocket connections.
*/
webSocketFactory?: NodeWebSocketFactoryBase;
}

/**
Expand Down Expand Up @@ -218,8 +232,8 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide

private logic: (context: TurnContext) => Promise<void>;
private streamingServer: IStreamingTransportServer;
private isEmulatingOAuthCards: boolean;

private isEmulatingOAuthCards: boolean;
private webSocketFactory: NodeWebSocketFactoryBase;

/**
* Creates a new instance of the [BotFrameworkAdapter](xref:botbuilder.BotFrameworkAdapter) class.
Expand All @@ -244,6 +258,16 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
this.credentialsProvider = new SimpleCredentialProvider(this.credentials.appId, this.credentials.appPassword);
this.isEmulatingOAuthCards = false;

// If the developer wants to use WebSockets, but didn't provide a WebSocketFactory,
// create a NodeWebSocketFactory.
if (this.settings.enableWebSockets && !this.settings.webSocketFactory) {
this.webSocketFactory = new NodeWebSocketFactory();
}

if (this.settings.webSocketFactory) {
this.webSocketFactory = this.settings.webSocketFactory;
}

// If no channelService or openIdMetadata values were passed in the settings, check the process' Environment Variables for values.
// These values may be set when a bot is provisioned on Azure and if so are required for the bot to properly work in Public Azure or a National Cloud.
this.settings.channelService = this.settings.channelService || process.env[AuthenticationConstants.ChannelService];
Expand Down Expand Up @@ -1174,8 +1198,13 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
throw new Error('Streaming logic needs to be provided to `useWebSocket`');
}

if (!this.webSocketFactory || !this.webSocketFactory.createWebSocket) {
throw new Error('BotFrameworkAdapter must have a WebSocketFactory in order to support streaming.');
}

this.logic = logic;

// Restify-specific check.
if (typeof((res as any).claimUpgrade) !== 'function') {
throw new Error("ClaimUpgrade is required for creating WebSocket connection.");
}
Expand All @@ -1187,10 +1216,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
}

const upgrade = (res as any).claimUpgrade();
const ws = new Watershed();
const socket = ws.accept(req, upgrade.socket, upgrade.head);
const socket = this.webSocketFactory.createWebSocket(req as IncomingMessage, upgrade.socket, upgrade.head);

await this.startWebSocket(new NodeWebSocket(socket));
await this.startWebSocket(socket);
}

/**
Expand Down
35 changes: 25 additions & 10 deletions libraries/botbuilder/tests/botFrameworkStreamingAdapter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class TestAdapterSettings {
constructor(appId = undefined, appPassword = undefined, channelAuthTenant, oAuthEndpoint, openIdMetadata, channelServce) {
this.appId = appId;
this.appPassword = appPassword;
this.enableWebSockets = true;
}
}

Expand Down Expand Up @@ -131,21 +132,35 @@ describe('BotFrameworkStreamingAdapter tests', () => {
expect(handler.streamingServer.disconnect()).to.not.throw;
});

it('starts and stops a websocket server', () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let request = new TestRequest();
let response = new TestResponse({ claimUpgrade: 'anything' });
it('starts and stops a websocket server', async () => {
const bot = new ActivityHandler();
const handler = new BotFrameworkAdapter(new TestAdapterSettings());
const request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
request.headers['upgrade'] = 'websocket';
request.headers['sec-websocket-key'] = 'BFlat';
request.headers['sec-websocket-version'] = '13';
request.headers['sec-websocket-protocol'] = '';

expect(handler.useWebSocket(request, response, async (context) => {
const response = new TestResponse({ claimUpgrade: 'anything' });
const fakeSocket = {
unshift: function () { return true; },
write: function (value) { },
on: function (value) { },
read: function () { return new Buffer.from('data', 'utf8'); },
end: function () { return; },
};
response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' });
await handler.useWebSocket(request, response, async (context) => {
// Route to bot
await bot.run(context);
})).to.not.throw;
});
});

it('returns a connector client', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down Expand Up @@ -175,7 +190,7 @@ describe('BotFrameworkStreamingAdapter tests', () => {
describe('useWebSocket()', () => {
it('connects', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down Expand Up @@ -424,7 +439,7 @@ describe('BotFrameworkStreamingAdapter tests', () => {

it('sends a request', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down
11 changes: 9 additions & 2 deletions libraries/botframework-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
*/
export { ContentStream } from './contentStream';
export { HttpContent } from './httpContentStream';
export { IStreamingTransportServer, IStreamingTransportClient, ISocket, IReceiveRequest, IReceiveResponse } from './Interfaces'
export { IStreamingTransportServer, IStreamingTransportClient, ISocket, IReceiveRequest, IReceiveResponse } from './interfaces';
export { NamedPipeClient, NamedPipeServer } from './namedPipe';
export { RequestHandler } from './requestHandler';
export { StreamingRequest } from './streamingRequest';
export { StreamingResponse } from './streamingResponse';
export { SubscribableStream } from './subscribableStream';
export { BrowserWebSocket, NodeWebSocket, WebSocketClient, WebSocketServer } from './webSocket';
export {
BrowserWebSocket,
NodeWebSocket,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
WebSocketClient,
WebSocketServer,
} from './webSocket';
10 changes: 10 additions & 0 deletions libraries/botframework-streaming/src/webSocket/factories/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export * from './nodeWebSocketFactory';
export * from './nodeWebSocketFactoryBase';
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import { Socket } from 'net';

import { NodeWebSocket } from '../nodeWebSocket';
import { NodeWebSocketFactoryBase } from './nodeWebSocketFactoryBase';

export class NodeWebSocketFactory extends NodeWebSocketFactoryBase {
constructor() {
super();
}

/**
* Creates a NodeWebSocket instance.
* @param req
* @param socket
* @param head
*/
public createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): NodeWebSocket {
const s = new NodeWebSocket();
s.create(req, socket, head);

return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import { Socket } from 'net';
import { ISocket } from '../../interfaces';

export abstract class NodeWebSocketFactoryBase {
public abstract createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): ISocket;
}
1 change: 1 addition & 0 deletions libraries/botframework-streaming/src/webSocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

export * from './browserWebSocket';
export * from './factories';
export * from '../interfaces/ISocket';
export * from './nodeWebSocket';
export * from './webSocketClient';
Expand Down
30 changes: 22 additions & 8 deletions libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import * as http from 'http';
import * as WaterShed from 'watershed';

import { IncomingMessage, request } from 'http';
import { Socket } from 'net';
import { Watershed } from 'watershed';
import { ISocket } from '../interfaces/ISocket';

const SHED = new Watershed();

export class NodeWebSocket implements ISocket {
private readonly waterShedSocket: any;
private waterShedSocket: any;
private connected: boolean;

/**
Expand All @@ -23,6 +27,17 @@ export class NodeWebSocket implements ISocket {
this.connected = !!waterShedSocket;
}

/**
* Create and set a WaterShed WebSocket with an HTTP Request, Socket and Buffer.
* @param req IncomingMessage
* @param socket Socket
* @param head Buffer
*/
public create(req: IncomingMessage, socket: Socket, head: Buffer): void {
this.waterShedSocket = SHED.accept(req, socket, head);
this.connected = true;
}

/**
* True if the socket is currently connected.
*/
Expand All @@ -47,9 +62,8 @@ export class NodeWebSocket implements ISocket {
*/
public async connect(serverAddress, port = 8082): Promise<void> {
// Following template from https://github.com/joyent/node-watershed#readme
let shed = new WaterShed.Watershed();
let wskey = shed.generateKey();
let options = {
const wskey = SHED.generateKey();
const options = {
port: port,
hostname: serverAddress,
headers: {
Expand All @@ -58,10 +72,10 @@ export class NodeWebSocket implements ISocket {
'Sec-WebSocket-Version': '13'
}
};
let req = http.request(options);
const req = request(options);
req.end();
req.on('upgrade', function(res, socket, head): void {
shed.connect(res, socket, head, wskey);
SHED.connect(res, socket, head, wskey);
});

this.connected = true;
Expand Down
77 changes: 77 additions & 0 deletions libraries/botframework-streaming/tests/NodeWebSocket.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const { NodeWebSocket } = require('../');
const { expect } = require('chai');
const { FauxSock, TestRequest } = require('./helpers');

describe('NodeSocket', () => {
it('creates a new NodeSocket', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns).to.be.instanceOf(NodeWebSocket);
expect(ns.close()).to.not.be.undefined;
});

it('requires a valid URL', () => {
try {
const ns = new NodeWebSocket(new FauxSock);
} catch (error) {
expect(error.message).to.equal('Invalid URL: fakeURL');
}
});

it('starts out connected', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns.isConnected()).to.be.true;
});

it('writes to the socket', () => {
const ns = new NodeWebSocket(new FauxSock);
const buff = Buffer.from('hello');
expect(ns.write(buff)).to.not.throw;
});

it('attempts to open a connection', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns.connect().catch((error) => {
expect(error.message).to.equal('connect ECONNREFUSED 127.0.0.1:8082');
}));
});

it('can set message handlers on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.textHandler).to.be.undefined;
expect(sock.binaryHandler).to.be.undefined;
expect(ns.setOnMessageHandler(() => { })).to.not.throw;
expect(sock.textHandler).to.not.be.undefined;
expect(sock.binaryHandler).to.not.be.undefined;
});

it('can set error handler on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.errorHandler).to.be.undefined;
expect(ns.setOnErrorHandler(() => { })).to.not.throw;
expect(sock.errorHandler).to.not.be.undefined;
});

it('can set end handler on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.endHandler).to.be.undefined;
expect(ns.setOnCloseHandler(() => { })).to.not.throw;
expect(sock.endHandler).to.not.be.undefined;
});

it('create() should be successful and set a WebSocket', () => {
const sock = new FauxSock();
const nodeSocket = new NodeWebSocket();
const request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
request.headers['upgrade'] = 'websocket';
request.headers['sec-websocket-key'] = 'BFlat';
request.headers['sec-websocket-version'] = '13';
request.headers['sec-websocket-protocol'] = '';
nodeSocket.create(request, sock, Buffer.from([]));
nodeSocket.waterShedSocket.destroy();
});
});
Loading