Skip to content
This repository was archived by the owner on Nov 9, 2023. It is now read-only.

Commit 203c972

Browse files
author
iantanwx
committed
fix: clean up engine listener
1 parent 320d31f commit 203c972

File tree

2 files changed

+62
-51
lines changed

2 files changed

+62
-51
lines changed

src/createEngineStream.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Duplex } from 'readable-stream';
2-
import { JsonRpcEngine, JsonRpcRequest } from 'json-rpc-engine';
1+
import { Duplex } from "readable-stream";
2+
import { JsonRpcEngine, JsonRpcRequest } from "json-rpc-engine";
33

44
interface EngineStreamOptions {
55
engine: JsonRpcEngine;
@@ -14,16 +14,16 @@ interface EngineStreamOptions {
1414
*/
1515
export default function createEngineStream(opts: EngineStreamOptions): Duplex {
1616
if (!opts || !opts.engine) {
17-
throw new Error('Missing engine parameter!');
17+
throw new Error("Missing engine parameter!");
1818
}
1919

2020
const { engine } = opts;
2121
const stream = new Duplex({ objectMode: true, read, write });
2222
// forward notifications
2323
if (engine.on) {
24-
engine.on('notification', (message) => {
25-
stream.push(message);
26-
});
24+
const onNotification = (message: unknown) => stream.push(message);
25+
engine.on("notification", onNotification);
26+
stream.on("end", () => engine.off("notification", onNotification));
2727
}
2828
return stream;
2929

@@ -34,7 +34,7 @@ export default function createEngineStream(opts: EngineStreamOptions): Duplex {
3434
function write(
3535
req: JsonRpcRequest<unknown>,
3636
_encoding: unknown,
37-
cb: (error?: Error | null) => void,
37+
cb: (error?: Error | null) => void
3838
) {
3939
engine.handle(req, (_err, res) => {
4040
stream.push(res);

test/index.js

Lines changed: 55 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,59 @@
1-
const test = require('tape');
2-
const { JsonRpcEngine } = require('json-rpc-engine');
3-
const { createStreamMiddleware, createEngineStream } = require('../dist');
4-
5-
test('middleware - raw test', (t) => {
1+
const test = require("tape");
2+
const { JsonRpcEngine } = require("json-rpc-engine");
3+
const { createStreamMiddleware, createEngineStream } = require("../dist");
64

5+
test("middleware - raw test", (t) => {
76
const jsonRpcConnection = createStreamMiddleware();
8-
const req = { id: 1, jsonrpc: '2.0', method: 'test' };
9-
const initRes = { id: 1, jsonrpc: '2.0' };
10-
const res = { id: 1, jsonrpc: '2.0', result: 'test' };
7+
const req = { id: 1, jsonrpc: "2.0", method: "test" };
8+
const initRes = { id: 1, jsonrpc: "2.0" };
9+
const res = { id: 1, jsonrpc: "2.0", result: "test" };
1110

1211
// listen for incomming requests
13-
jsonRpcConnection.stream.on('data', (_req) => {
14-
t.equal(req, _req, 'got the expected request');
12+
jsonRpcConnection.stream.on("data", (_req) => {
13+
t.equal(req, _req, "got the expected request");
1514
jsonRpcConnection.stream.write(res);
1615
});
1716

1817
// run middleware, expect end fn to be called
19-
jsonRpcConnection.middleware(req, initRes, () => {
20-
t.fail('should not call next');
21-
}, (err) => {
22-
t.notOk(err, 'should not error');
23-
t.deepEqual(initRes, res, 'got the expected response');
24-
t.end();
25-
});
18+
jsonRpcConnection.middleware(
19+
req,
20+
initRes,
21+
() => {
22+
t.fail("should not call next");
23+
},
24+
(err) => {
25+
t.notOk(err, "should not error");
26+
t.deepEqual(initRes, res, "got the expected response");
27+
t.end();
28+
}
29+
);
2630
});
2731

28-
test('engine to stream - raw test', (t) => {
29-
32+
test("engine to stream - raw test", (t) => {
3033
const engine = new JsonRpcEngine();
3134
engine.push((_req, res, _next, end) => {
32-
res.result = 'test';
35+
res.result = "test";
3336
end();
3437
});
3538

3639
const stream = createEngineStream({ engine });
37-
const req = { id: 1, jsonrpc: '2.0', method: 'test' };
38-
const res = { id: 1, jsonrpc: '2.0', result: 'test' };
40+
const req = { id: 1, jsonrpc: "2.0", method: "test" };
41+
const res = { id: 1, jsonrpc: "2.0", result: "test" };
3942

4043
// listen for incomming requests
41-
stream.on('data', (_res) => {
42-
t.deepEqual(res, _res, 'got the expected response');
44+
stream.on("data", (_res) => {
45+
t.deepEqual(res, _res, "got the expected response");
4346
t.end();
4447
});
4548

46-
stream.on('error', (err) => {
49+
stream.on("error", (err) => {
4750
t.fail(err.message);
4851
});
4952

5053
stream.write(req);
5154
});
5255

53-
test('middleware and engine to stream', (t) => {
54-
56+
test("middleware and engine to stream", (t) => {
5557
// create guest
5658
const engineA = new JsonRpcEngine();
5759
const jsonRpcConnection = createStreamMiddleware();
@@ -60,35 +62,33 @@ test('middleware and engine to stream', (t) => {
6062
// create host
6163
const engineB = new JsonRpcEngine();
6264
engineB.push((_req, res, _next, end) => {
63-
res.result = 'test';
65+
res.result = "test";
6466
end();
6567
});
6668

6769
// connect both
6870
const clientSideStream = jsonRpcConnection.stream;
6971
const hostSideStream = createEngineStream({ engine: engineB });
70-
clientSideStream
71-
.pipe(hostSideStream)
72-
.pipe(clientSideStream);
72+
clientSideStream.pipe(hostSideStream).pipe(clientSideStream);
7373

7474
// request and expected result
75-
const req = { id: 1, jsonrpc: '2.0', method: 'test' };
76-
const res = { id: 1, jsonrpc: '2.0', result: 'test' };
75+
const req = { id: 1, jsonrpc: "2.0", method: "test" };
76+
const res = { id: 1, jsonrpc: "2.0", result: "test" };
7777

7878
engineA.handle(req, (err, _res) => {
79-
t.notOk(err, 'does not error');
80-
t.deepEqual(res, _res, 'got the expected response');
79+
t.notOk(err, "does not error");
80+
t.deepEqual(res, _res, "got the expected response");
8181
t.end();
8282
});
8383
});
8484

85-
test('server notification', (t) => {
85+
test("server notification", (t) => {
8686
t.plan(1);
8787

8888
const jsonRpcConnection = createStreamMiddleware();
89-
const notif = { jsonrpc: '2.0', method: 'test_notif' };
89+
const notif = { jsonrpc: "2.0", method: "test_notif" };
9090

91-
jsonRpcConnection.events.once('notification', (message) => {
91+
jsonRpcConnection.events.once("notification", (message) => {
9292
t.equals(message.method, notif.method);
9393
t.end();
9494
});
@@ -97,21 +97,32 @@ test('server notification', (t) => {
9797
jsonRpcConnection.stream.write(notif);
9898
});
9999

100-
test('server notification in stream', (t) => {
100+
test("server notification in stream", (t) => {
101101
const engine = new JsonRpcEngine();
102102

103103
const stream = createEngineStream({ engine });
104-
const notif = { jsonrpc: '2.0', method: 'test_notif' };
104+
const notif = { jsonrpc: "2.0", method: "test_notif" };
105105

106106
// listen for incomming requests
107-
stream.once('data', (_notif) => {
108-
t.deepEqual(notif, _notif, 'got the expected notification');
107+
stream.once("data", (_notif) => {
108+
t.deepEqual(notif, _notif, "got the expected notification");
109109
t.end();
110110
});
111111

112-
stream.on('error', (err) => {
112+
stream.on("error", (err) => {
113113
t.fail(err.message);
114114
});
115115

116-
engine.emit('notification', notif);
116+
engine.emit("notification", notif);
117+
});
118+
119+
test("clean up listener", (t) => {
120+
const n = 10;
121+
const engine = new JsonRpcEngine();
122+
for (let i = 0; i < n; i++) {
123+
const stream = createEngineStream({ engine });
124+
stream.end();
125+
}
126+
t.equal(engine.listenerCount(), 0);
127+
t.end();
117128
});

0 commit comments

Comments
 (0)