Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocketClient: fix unexpected eof + improve subscribe & query #54

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 63 additions & 108 deletions src/client/WebSocketClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import WebSocket from 'ws';
import { hashToHex } from '../util/hash';
import { TendermintQuery } from '../util/TendermintQuery';

type Callback = (data: TendermintSubscriptionResponse) => void;

Expand Down Expand Up @@ -28,112 +29,54 @@ export type TendermintEventType =
| 'ValidBlock'
| 'Vote';

type TendermintQueryOperand = string | number | Date;

export interface TendermintQuery {
[k: string]:
| TendermintQueryOperand
| ['>', number | Date]
| ['<', number | Date]
| ['<=', number | Date]
| ['>=', number | Date]
| ['CONTAINS', string]
| ['EXISTS'];
}

const escapeSingleQuotes = (str: string) => str.replace(/'/g, "\\'");

function makeQueryParams(query: TendermintQuery): string {
const queryBuilder: string[] = [];
for (const key of Object.keys(query)) {
let queryItem: string;
const value = query[key];
// if value is scalar
if (!Array.isArray(value)) {
switch (typeof value) {
case 'number':
queryItem = `${key}=${value}`;
break;
case 'string':
queryItem = `${key}='${escapeSingleQuotes(value)}'`;
break;
default:
// Date
queryItem = `${key}=${value.toISOString()}`;
}
} else {
switch (value[0]) {
case '>':
case '<':
case '<=':
case '>=':
if (typeof value[1] !== 'number') {
queryItem = `${key}${value[0]}${value[1].toISOString()}`;
} else {
queryItem = `${key}${value[0]}${value[1]}`;
}
break;
case 'CONTAINS':
queryItem = `${key} CONTAINS '${escapeSingleQuotes(value[1])}'`;
break;
case 'EXISTS':
queryItem = `${key} EXISTS`;
break;
}
}
queryBuilder.push(queryItem);
}
return queryBuilder.join(' AND ');
}

/**
* An object repesenting a connection to a Terra node's WebSocket RPC endpoint.
* This allows for subscribing to Tendermint events through WebSocket.
*
* ### Events
* **error** emitted when error raises
* **connect** emitted after connection establishment
* **reconnect** emitted upon every attempt of reconnection
* **destroyed** emitted when socket has been destroyed
* - **error** emitted when error raises
* - **connect** emitted after connection establishment
* - **reconnect** emitted upon every attempt of reconnection
* - **destroyed** emitted when socket has been destroyed
*
* ### Example
*
* ```ts
* import { WebSocketClient } from '@terra-money/terra.js';
* import { TendermintQuery, WebSocketClient } from '@terra-money/terra.js';
*
* const wsclient = new WebSocketClient("ws://localhost:26657/websocket");
* wsclient.start();
* wsclient.on('connect', () => {
* wsclient.subscribe('NewBlock', new TendermintQuery(), (data) => {
* console.log(data.value);
*
* wsclient.subscribe('NewBlock', {}, (data) => {
* console.log(data.value);
*
* // close after receiving one block.
* wsclient.destroy();
* })
* // close after receiving one block.
* wsclient.destroy();
* });
*
* wsclient.subscribe(
* 'Tx',
* {
* 'message.action': 'send',
* 'message.sender': ['CONTAINS', 'terra1...'],
* },
* (data) => {
* console.log(data.value);
* wsclient.subscribe(
* 'Tx',
* new TendermintQuery()
* .exact('message.action', 'send')
* .contains('message.sender', 'terra1...'),
* (data) => {
* console.log(data.value);
*
* // close after receiving one send Tx
* wsclient.destroy();
* // close after receiving one send Tx
* wsclient.destroy();
* },
* );
* });
*
* wsclient.start();
* ```
*/
export class WebSocketClient extends EventEmitter {
public isConnected: boolean;
private _connected: boolean;
private reconnectTimeoutId?: NodeJS.Timeout;
private queryParams?: string;
private callback?: Callback;
private callbacks = new Map<number, (data: any) => void>();
private shouldAttemptReconnect: boolean;
private socket!: WebSocket;
private _reconnectCount: number;
private _nextSubId = 1;

/**
* WebSocketClient constructor
Expand All @@ -149,7 +92,7 @@ export class WebSocketClient extends EventEmitter {
) {
super();
this._reconnectCount = this.reconnectCount;
this.isConnected = false;
this._connected = false;
this.shouldAttemptReconnect = !!this.reconnectInterval;
}

Expand All @@ -171,41 +114,42 @@ export class WebSocketClient extends EventEmitter {
this.socket.onerror = () => undefined;
}

send(data: any) {
if (this.socket) {
this.socket.send(JSON.stringify(data));
}
return this;
}

private onOpen() {
this.isConnected = true;
this._connected = true;
this.emit('connect');
// reset reconnectCount after connection establishment
this._reconnectCount = this.reconnectCount;

this.socket.send(
JSON.stringify({
jsonrpc: '2.0',
method: 'subscribe',
params: [this.queryParams],
id: 1,
})
);
}

private onMessage(message: WebSocket.MessageEvent) {
const data = message.data.toString();

// ignore empty messages. fixes "unexpected EOF"
if (!data.trim()) return;

try {
const parsedData = JSON.parse(message.data.toString());
if (!('result' in parsedData && 'id' in parsedData)) {
throw new Error('Invalid message format');
}

if (
this.callback &&
parsedData.result &&
parsedData.result.query === this.queryParams
) {
// this.emit('message', parsedData.result.data);
this.callback(parsedData.result.data);
if (parsedData.result?.data) {
this.callbacks.get(parsedData.id)?.(parsedData.result.data);
}
} catch (err) {
this.emit('error', err);
}
}

private onClose() {
this.isConnected = false;
this._connected = false;

if (
this.shouldAttemptReconnect &&
Expand All @@ -229,12 +173,19 @@ export class WebSocketClient extends EventEmitter {
event: TendermintEventType,
query: TendermintQuery,
callback: Callback
): void {
this.queryParams = makeQueryParams({
'tm.event': event,
...query,
) {
const id = this._nextSubId++;
query = query.clone().exact('tm.event', event);

this.send({
jsonrpc: '2.0',
method: 'subscribe',
params: [query.toString()],
id,
});
this.callback = callback;
this.callbacks.set(id, callback);

return this;
}

public subscribeTx(query: TendermintQuery, callback: Callback): void {
Expand All @@ -245,4 +196,8 @@ export class WebSocketClient extends EventEmitter {

this.subscribe('Tx', query, newCallback);
}

get isConnected() {
return this._connected;
}
}
57 changes: 57 additions & 0 deletions src/util/TendermintQuery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const escape = (str: string) => str.replace(/\\/g, '\\\\').replace(/'/g, "\\'");

export class TendermintQuery {
private _query: string[] = [];

getValue(value: number | string | Date) {
if (typeof value === 'number') {
return value;
} else if (typeof value === 'string') {
return `'${escape(value)}'`;
} else {
return value.toISOString();
}
}

exact(field: string, value: number | string | Date) {
this._query.push(`${field}=${this.getValue(value)}`);
return this;
}

compare(field: string, op: `${'<' | '>'}${'' | '='}`, value: number | Date) {
this._query.push(`${field}${op}${this.getValue(value)}`);
return this;
}

exists(field: string) {
this._query.push(`${field} EXISTS`);
return this;
}

contains(field: string, value: string) {
this._query.push(`${field} CONTAINS '${escape(value)}'`);
return this;
}

clone() {
const q = new TendermintQuery();
q._query = this._query.slice();
return q;
}

toString() {
return this._query.join(' AND ');
}

static AND(lhs: TendermintQuery, rhs: TendermintQuery) {
const q = new TendermintQuery();
q._query.push(`(${lhs}) AND (${rhs})`);
return q;
}

static OR(lhs: TendermintQuery, rhs: TendermintQuery) {
const q = new TendermintQuery();
q._query.push(`(${lhs}) OR (${rhs})`);
return q;
}
}
1 change: 1 addition & 0 deletions src/util/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './hash';
export * from './contract';
export * from './TendermintQuery';