Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick to master] add generic socket closure handling #1664

Merged
merged 3 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions libraries/botbuilder/src/botFrameworkAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide

}

/**
* Used in streaming contexts to check if the streaming connection is still open for the bot to send activities.
*/
public get isStreamingConnectionOpen(): boolean {
return this.streamingServer.isConnected;
}

/**
* Asynchronously resumes a conversation with a user, possibly after some time has gone by.
*
Expand Down Expand Up @@ -835,6 +842,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
throw new Error(`BotFrameworkAdapter.sendActivity(): missing conversation id.`);
}
if (activity && BotFrameworkAdapter.isStreamingServiceUrl(activity.serviceUrl)) {
if (!this.isStreamingConnectionOpen) {
throw new Error('BotFrameworkAdapter.sendActivities(): Unable to send activity as Streaming connection is closed.');
}
TokenResolver.checkForOAuthCards(this, context, activity as Activity);
}
const client = this.getOrCreateConnectorClient(context, activity.serviceUrl, this.credentials);
Expand Down
7 changes: 6 additions & 1 deletion libraries/botbuilder/src/streaming/streamingHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ export class StreamingHttpClient implements HttpClient {
*/
public async sendRequest(httpRequest: WebResource): Promise<HttpOperationResponse> {
if (!httpRequest) {
throw new Error('SendRequest invalid parameter: httpRequest should be provided');
throw new Error('StreamingHttpClient.sendRequest(): missing "httpRequest" parameter');
}
if (!this.server.isConnected) {
throw new Error('StreamingHttpClient.sendRequest(): Streaming connection is disconnected, and the request could not be sent.');

}

const request = this.mapHttpRequestToProtocolRequest(httpRequest);
request.path = request.path.substring(request.path.indexOf('/v3'));
const res = await this.server.send(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const { Socket } = require('net');

const { expect } = require('chai');
const { spy } = require('sinon');
const { ActivityHandler, ActivityTypes } = require('botbuilder-core');
const { ActivityHandler, ActivityTypes, TurnContext } = require('botbuilder-core');

const { BotFrameworkAdapter, StatusCodes } = require('../../');

Expand Down Expand Up @@ -50,6 +50,25 @@ describe('BotFrameworkAdapter Streaming tests', () => {
expect(adapter.streamingServer.disconnect()).to.not.throw;
});

it('sendActivities should throw an error if streaming connection is closed.', async () => {
const activity = {
serviceUrl: 'urn:botframework:WebSocket:wss://beep.com',
type: 'message'
};
const reply = {
conversation: { id: 'convo1' },
...activity
};

const adapter = new BotFrameworkAdapter({});
adapter.streamingServer = { isConnected: false };
try {
await adapter.sendActivities(new TurnContext(adapter, activity), [reply]);
} catch (err) {
expect(err.message).contains('BotFrameworkAdapter.sendActivities(): Unable to send activity as Streaming connection is closed.');
}
});

it('starts and stops a websocket server', async () => {
const bot = new ActivityHandler();
const adapter = new BotFrameworkAdapter(new TestAdapterSettings());
Expand Down
40 changes: 40 additions & 0 deletions libraries/botbuilder/tests/streaming/streamingHttpClient.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const { expect } = require('chai');
const { StreamingHttpClient } = require('../../lib');

describe('StreamingHttpClient', function() {
this.timeout(3000);

it('should construct when provided a server', () => {
const server = { isConnected: true };
const client = new StreamingHttpClient(server);
expect(client.server).to.equal(server);
});

it('should throw an error if missing the "server" parameter', () => {
try {
new StreamingHttpClient();
} catch (err) {
expect(err.message).to.contain('StreamingHttpClient: Expected server.');
}
});

it('should throw an error on sendRequest if missing "httpRequest" parameter', async () => {
const client = new StreamingHttpClient({});
try {
await client.sendRequest();
} catch (err) {
expect(err).to.be.instanceOf(Error);
expect(err.message).to.contain('StreamingHttpClient.sendRequest(): missing "httpRequest" parameter');
}
});

it('should throw an error on sendRequest if internal server is not connected', async () => {
const client = new StreamingHttpClient({});
try {
await client.sendRequest({});
} catch (err) {
expect(err).to.be.instanceOf(Error);
expect(err.message).to.contain('StreamingHttpClient.sendRequest(): Streaming connection is disconnected');
}
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ export interface IStreamingTransportServer {
start(): Promise<string>;
disconnect(): void;
send(request: StreamingRequest): Promise<IReceiveResponse>;
isConnected?: boolean;
}
11 changes: 11 additions & 0 deletions libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ export class NamedPipeServer implements IStreamingTransportServer {
* @param autoReconnect Optional setting to determine if the client sould attempt to reconnect automatically on disconnection events. Defaults to true.
*/
public constructor(baseName: string, requestHandler?: RequestHandler, autoReconnect: boolean = true) {
if (!baseName) {
throw new TypeError('NamedPipeServer: Missing baseName parameter');
}

this._baseName = baseName;
this._requestHandler = requestHandler;
this._autoReconnect = autoReconnect;
Expand All @@ -51,6 +55,13 @@ export class NamedPipeServer implements IStreamingTransportServer {
this._receiver.disconnected = this.onConnectionDisconnected.bind(this);
}

/**
* Returns true if currently connected.
*/
public get isConnected(): boolean {
return !!(this._receiver.isConnected && this._sender.isConnected);
}

/**
* Used to establish the connection used by this server and begin listening for incoming messages.
*
Expand Down
13 changes: 13 additions & 0 deletions libraries/botframework-streaming/src/webSocket/webSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class WebSocketServer implements IStreamingTransportServer {
private readonly _protocolAdapter: ProtocolAdapter;
private readonly _webSocketTransport: WebSocketTransport;
private _closedSignal;
private _socket: ISocket;

/**
* Creates a new instance of the [WebSocketServer](xref:botframework-streaming.WebSocketServer) class.
Expand All @@ -38,6 +39,11 @@ export class WebSocketServer implements IStreamingTransportServer {
* @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this server.
*/
public constructor(socket: ISocket, requestHandler?: RequestHandler) {
if (!socket) {
throw new TypeError('WebSocketServer: Missing socket parameter');
}

this._socket = socket;
this._webSocketTransport = new WebSocketTransport(socket);
this._requestHandler = requestHandler;

Expand All @@ -53,6 +59,13 @@ export class WebSocketServer implements IStreamingTransportServer {
this._closedSignal = (x: string): string => { return x; };
}

/**
* Examines the stored ISocket and returns true if the socket connection is open.
*/
public get isConnected(): boolean {
return this._socket.isConnected;
}

/**
* Used to establish the connection used by this server and begin listening for incoming messages.
*
Expand Down
17 changes: 17 additions & 0 deletions libraries/botframework-streaming/tests/NamedPipe.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ describe('Streaming Extensions NamedPipe Library Tests', () => {
expect(server.disconnect()).to.not.throw;
});

it('throws a TypeError during construction if missing the "baseName" parameter', () => {
try {
new np.NamedPipeServer();
} catch (err) {
expect(err).to.be.instanceOf(TypeError);
expect(err.message).to.contain('NamedPipeServer: Missing baseName parameter');
}
});

it('starts the server without throwing', () => {
let server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), false);
expect(server).to.be.instanceOf(np.NamedPipeServer);
Expand All @@ -362,6 +371,14 @@ describe('Streaming Extensions NamedPipe Library Tests', () => {
expect(server.disconnect()).to.not.throw;
});

it('returns true if isConnected === true on _receiver & _sender', () => {
const server = new np.NamedPipeServer('pipeisConnected', new protocol.RequestHandler(), false);

expect(server.isConnected).to.be.false;
server._receiver = { isConnected: true };
server._sender = { isConnected: true };
expect(server.isConnected).to.be.true;
});

it('sends without throwing', (done) => {
let server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), false);
Expand Down
9 changes: 9 additions & 0 deletions libraries/botframework-streaming/tests/WebSocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ describe('Streaming Extensions WebSocket Library Tests', () => {
expect(server.disconnect()).to.not.throw;
});

it('throws a TypeError during construction if missing the "socket" parameter', () => {
try {
new ws.WebSocketServer();
} catch (err) {
expect(err).to.be.instanceOf(TypeError);
expect(err.message).to.contain('WebSocketServer: Missing socket parameter');
}
});

it('connects', (done) => {
let server = new ws.WebSocketServer(new FauxSock, new protocol.RequestHandler());
expect(server).to.be.instanceOf(ws.WebSocketServer);
Expand Down