Skip to content

Commit

Permalink
feat: Subscribe to multiple WebSocket channels at once (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
bennycode authored Mar 20, 2020
1 parent 8d9979c commit ae9db0a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 25 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, message => {
// 6. Receive message from WebSocket channel
console.log(`Received message of type "${message.type}".`, message);
// 7. Unsubscribe from WebSocket channel
client.ws.unsubscribe(channel);
client.ws.unsubscribe([channel]);
// 8. Disconnect & end program
client.ws.disconnect();
});
Expand All @@ -96,7 +96,7 @@ client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, message => {
// 4. Connect to WebSocket
await client.ws.connect();
// 5. Subscribe to WebSocket channel
client.ws.subscribe(channel);
client.ws.subscribe([channel]);
})();
```

Expand Down
40 changes: 24 additions & 16 deletions src/client/WebSocketClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ describe('WebSocketClient', () => {

client.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
expect(tickerMessage.trade_id).toBe(3526965);
client.unsubscribe({
name: WebSocketChannelName.TICKER,
product_ids: productIds,
});
client.unsubscribe([
{
name: WebSocketChannelName.TICKER,
product_ids: productIds,
},
]);
});

client.on(WebSocketEvent.ON_MESSAGE, event => {
Expand All @@ -85,10 +87,12 @@ describe('WebSocketClient', () => {

await client.connect();

client.subscribe({
name: WebSocketChannelName.TICKER,
product_ids: productIds,
});
client.subscribe([
{
name: WebSocketChannelName.TICKER,
product_ids: productIds,
},
]);
});

it('receives typed messages from "matches" channel', async done => {
Expand Down Expand Up @@ -116,10 +120,12 @@ describe('WebSocketClient', () => {

client.on(WebSocketEvent.ON_MESSAGE_MATCHES, message => {
expect(message.trade_id).toBe(9713921);
client.unsubscribe({
name: WebSocketChannelName.MATCHES,
product_ids: productIds,
});
client.unsubscribe([
{
name: WebSocketChannelName.MATCHES,
product_ids: productIds,
},
]);
});

client.on(WebSocketEvent.ON_MESSAGE, event => {
Expand All @@ -130,10 +136,12 @@ describe('WebSocketClient', () => {

await client.connect();

client.subscribe({
name: WebSocketChannelName.MATCHES,
product_ids: productIds,
});
client.subscribe([
{
name: WebSocketChannelName.MATCHES,
product_ids: productIds,
},
]);
});
});

Expand Down
22 changes: 15 additions & 7 deletions src/client/WebSocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ export enum WebSocketResponseType {
* go through the order lifecycle.
*/
FULL_ACTIVATE = 'activate',
/**
* Latest match between two orders.
*/
LAST_MATCH = 'last_match',
}

export type WebSocketResponse = {type: WebSocketResponseType} & WebSocketMessage;
Expand Down Expand Up @@ -153,6 +157,8 @@ export type WebSocketMatchMessage = {
type: WebSocketResponseType.FULL_MATCH;
};

export type WebSocketLastMatchMessage = Omit<WebSocketMatchMessage, 'type'> & {type: WebSocketResponseType.LAST_MATCH};

export enum WebSocketEvent {
ON_CLOSE = 'WebSocketEvent.ON_CLOSE',
ON_ERROR = 'WebSocketEvent.ON_ERROR',
Expand All @@ -169,7 +175,10 @@ export interface WebSocketClient {

on(event: WebSocketEvent.ON_MESSAGE, listener: (response: WebSocketResponse) => void): this;

on(event: WebSocketEvent.ON_MESSAGE_MATCHES, listener: (matchMessage: WebSocketMatchMessage) => void): this;
on(
event: WebSocketEvent.ON_MESSAGE_MATCHES,
listener: (matchMessage: WebSocketLastMatchMessage | WebSocketMatchMessage) => void
): this;

on(event: WebSocketEvent.ON_MESSAGE_TICKER, listener: (tickerMessage: WebSocketTickerMessage) => void): this;

Expand Down Expand Up @@ -222,6 +231,7 @@ export class WebSocketClient extends EventEmitter {
this.emit(WebSocketEvent.ON_MESSAGE_TICKER, response);
break;
case WebSocketResponseType.FULL_MATCH:
case WebSocketResponseType.LAST_MATCH:
this.emit(WebSocketEvent.ON_MESSAGE_MATCHES, response);
break;
}
Expand Down Expand Up @@ -252,18 +262,16 @@ export class WebSocketClient extends EventEmitter {
this.socket.send(JSON.stringify(message));
}

subscribe(channel: WebSocketChannel): void {
subscribe(channels: WebSocketChannel[]): void {
this.sendMessage({
channels: [channel],
product_ids: channel.product_ids,
channels,
type: WebSocketRequestType.SUBSCRIBE,
});
}

unsubscribe(channel: WebSocketChannel): void {
unsubscribe(channels: WebSocketChannel[]): void {
this.sendMessage({
channels: [channel],
product_ids: channel.product_ids,
channels,
type: WebSocketRequestType.UNSUBSCRIBE,
});
}
Expand Down

0 comments on commit ae9db0a

Please sign in to comment.