Skip to content
This repository was archived by the owner on Nov 9, 2023. It is now read-only.

Commit 10763e4

Browse files
committed
Add duplex engine stream
1 parent 8c64407 commit 10763e4

File tree

9 files changed

+368
-78
lines changed

9 files changed

+368
-78
lines changed

jest.config.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ module.exports = {
2121
// An object that configures minimum threshold enforcement for coverage results
2222
coverageThreshold: {
2323
global: {
24-
branches: 69.23,
25-
functions: 88.88,
26-
lines: 93.75,
27-
statements: 93.75,
24+
branches: 81.81,
25+
functions: 89.47,
26+
lines: 96.64,
27+
statements: 96.64,
2828
},
2929
},
3030

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"test:watch": "jest --watch"
2525
},
2626
"dependencies": {
27-
"@metamask/safe-event-emitter": "^2.0.0",
27+
"@metamask/utils": "^2.1.0",
2828
"readable-stream": "^2.3.3"
2929
},
3030
"devDependencies": {
@@ -48,7 +48,7 @@
4848
"eslint-plugin-prettier": "^3.3.1",
4949
"jest": "^27.5.1",
5050
"jest-it-up": "^2.0.2",
51-
"json-rpc-engine": "^6.1.0",
51+
"json-rpc-engine": "./json-rpc-engine-6.1.2.tgz",
5252
"prettier": "^2.2.1",
5353
"prettier-plugin-packagejson": "^2.2.17",
5454
"rimraf": "^3.0.2",

src/createDuplexJsonRpcStream.ts

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import {
2+
hasProperty,
3+
isJsonRpcRequest,
4+
isObject,
5+
JsonRpcId,
6+
JsonRpcNotification,
7+
JsonRpcRequest,
8+
JsonRpcResponse,
9+
RuntimeObject,
10+
} from '@metamask/utils';
11+
import { Duplex } from 'readable-stream';
12+
import {
13+
DuplexJsonRpcEngine,
14+
JsonRpcMiddleware,
15+
JsonRpcNotificationHandler,
16+
} from 'json-rpc-engine';
17+
import { ErrorMessages, IdMapValue } from './utils';
18+
19+
type StreamCallback = (error?: Error | null) => void;
20+
21+
interface DuplexJsonRpcStreamOptions {
22+
receiverMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
23+
receiverNotificationHandler?: JsonRpcNotificationHandler<unknown>;
24+
senderMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
25+
senderNotificationHandler?: JsonRpcNotificationHandler<unknown>;
26+
}
27+
28+
/**
29+
* Foobar, bar baz.
30+
*
31+
* @param options - Options bag.
32+
* @returns The stream wrapping the duplex JSON-RPC engine.
33+
*/
34+
export default function createDuplexJsonRpcStream(
35+
options: DuplexJsonRpcStreamOptions,
36+
) {
37+
const {
38+
receiverMiddleware = [],
39+
receiverNotificationHandler = () => undefined,
40+
senderMiddleware = [],
41+
senderNotificationHandler,
42+
} = options;
43+
44+
const outgoingIdMap: Map<JsonRpcId, IdMapValue> = new Map();
45+
const stream = new Duplex({
46+
objectMode: true,
47+
read: () => undefined,
48+
write: processMessage,
49+
});
50+
51+
const sendNotification = (notification: JsonRpcNotification<unknown>) => {
52+
stream.push(notification);
53+
return undefined;
54+
};
55+
56+
const _senderNotificationHandler = senderNotificationHandler
57+
? async (notification: JsonRpcNotification<unknown>) => {
58+
await senderNotificationHandler(notification);
59+
return sendNotification(notification);
60+
}
61+
: sendNotification;
62+
63+
const engine = new DuplexJsonRpcEngine({
64+
receiverNotificationHandler,
65+
senderNotificationHandler: _senderNotificationHandler,
66+
});
67+
68+
receiverMiddleware.forEach((middleware) =>
69+
engine.addReceiverMiddleware(middleware),
70+
);
71+
72+
senderMiddleware.forEach((middleware) =>
73+
engine.addSenderMiddleware(middleware),
74+
);
75+
76+
engine.addSenderMiddleware((req, res, _next, end) => {
77+
// write req to stream
78+
stream.push(req);
79+
// register request on id map if
80+
if (isJsonRpcRequest(req)) {
81+
outgoingIdMap.set(req.id, { res, end });
82+
}
83+
});
84+
85+
return { duplexEngine: engine, duplexEngineStream: stream };
86+
87+
/**
88+
* Writes a JSON-RPC object to the stream.
89+
*
90+
* @param message - The message to write to the stream.
91+
* @param _encoding - The stream encoding, not used.
92+
* @param cb - The stream write callback.
93+
* @returns Nothing.
94+
*/
95+
function processMessage(
96+
message: unknown,
97+
_encoding: unknown,
98+
cb: StreamCallback,
99+
): void {
100+
let err: Error | null = null;
101+
try {
102+
if (!isObject(message)) {
103+
throw new Error('not an object');
104+
} else if (isResponse(message)) {
105+
receiveResponse(message);
106+
} else if (isRequest(message)) {
107+
return receiveRequest(message, cb);
108+
} else {
109+
throw new Error('neither a response nor request');
110+
}
111+
} catch (_err) {
112+
err = _err as Error;
113+
}
114+
115+
// continue processing stream
116+
return cb(err);
117+
}
118+
119+
/**
120+
* Forwards a JSON-RPC request or notification to the receiving pipeline.
121+
* Pushes any response from the pipeline to the stream.
122+
*
123+
* @param req - The request or notification to receive.
124+
* @param cb - The stream write callback.
125+
*/
126+
function receiveRequest(
127+
req: JsonRpcRequest<unknown> | JsonRpcNotification<unknown>,
128+
cb: StreamCallback,
129+
) {
130+
// TypeScript defaults to the notification overload and we don't get a
131+
// response unless we cast.
132+
engine
133+
.receive(req as JsonRpcRequest<unknown>)
134+
.then((response) => {
135+
if (response) {
136+
stream.push(response);
137+
}
138+
cb();
139+
})
140+
.catch((error) => cb(error));
141+
}
142+
143+
/**
144+
* Receives a response to a request sent via the sending pipeline.
145+
*
146+
* @param res - The response to receive.
147+
*/
148+
function receiveResponse(res: JsonRpcResponse<unknown>) {
149+
const context = outgoingIdMap.get(res.id);
150+
if (!context) {
151+
throw new Error(ErrorMessages.unknownResponse(res.id));
152+
}
153+
154+
// Copy response received from the stream unto original response object,
155+
// which will be returned by the engine on this side.
156+
Object.assign(context.res, res);
157+
158+
outgoingIdMap.delete(res.id);
159+
// Prevent internal stream handler from catching errors from this callback.
160+
setTimeout(context.end);
161+
}
162+
}
163+
164+
/**
165+
* A type guard for {@link JsonRpcResponse}.
166+
*
167+
* @param message - The object to type check.
168+
* @returns The type check result.
169+
*/
170+
function isResponse(
171+
message: RuntimeObject,
172+
): message is JsonRpcResponse<unknown> {
173+
return hasProperty(message, 'result') || hasProperty(message, 'error');
174+
}
175+
176+
/**
177+
* A type guard for {@link JsonRpcRequest} or {@link JsonRpcNotification}.
178+
*
179+
* @param message - The object to type check.
180+
* @returns The type check result.
181+
*/
182+
function isRequest(
183+
message: RuntimeObject,
184+
): message is JsonRpcRequest<unknown> | JsonRpcNotification<unknown> {
185+
return hasProperty(message, 'method');
186+
}

src/createEngineStream.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { JsonRpcRequest } from '@metamask/utils';
12
import { Duplex } from 'readable-stream';
2-
import { JsonRpcEngine, JsonRpcRequest } from 'json-rpc-engine';
3+
import { JsonRpcEngine } from 'json-rpc-engine';
34

45
interface EngineStreamOptions {
56
engine: JsonRpcEngine;
@@ -13,18 +14,8 @@ interface EngineStreamOptions {
1314
* @returns The stream wrapping the engine.
1415
*/
1516
export default function createEngineStream(opts: EngineStreamOptions): Duplex {
16-
if (!opts || !opts.engine) {
17-
throw new Error('Missing engine parameter!');
18-
}
19-
2017
const { engine } = opts;
2118
const stream = new Duplex({ objectMode: true, read: () => undefined, write });
22-
// forward notifications
23-
if (engine.on) {
24-
engine.on('notification', (message) => {
25-
stream.push(message);
26-
});
27-
}
2819
return stream;
2920

3021
/**

src/createStreamMiddleware.ts

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,7 @@
1-
import SafeEventEmitter from '@metamask/safe-event-emitter';
21
import { Duplex } from 'readable-stream';
3-
import {
4-
JsonRpcEngineNextCallback,
5-
JsonRpcEngineEndCallback,
6-
JsonRpcNotification,
7-
JsonRpcMiddleware,
8-
JsonRpcRequest,
9-
PendingJsonRpcResponse,
10-
} from 'json-rpc-engine';
11-
12-
interface IdMapValue {
13-
req: JsonRpcRequest<unknown>;
14-
res: PendingJsonRpcResponse<unknown>;
15-
next: JsonRpcEngineNextCallback;
16-
end: JsonRpcEngineEndCallback;
17-
}
18-
19-
interface IdMap {
20-
[requestId: string]: IdMapValue;
21-
}
2+
import { JsonRpcMiddleware } from 'json-rpc-engine';
3+
import { isJsonRpcRequest, JsonRpcId, JsonRpcResponse } from '@metamask/utils';
4+
import { ErrorMessages, IdMapValue } from './utils';
225

236
/**
247
* Creates a JsonRpcEngine middleware with an associated Duplex stream and
@@ -29,28 +12,28 @@ interface IdMap {
2912
* @returns The event emitter, middleware, and stream.
3013
*/
3114
export default function createStreamMiddleware() {
32-
const idMap: IdMap = {};
15+
const idMap: Map<JsonRpcId, IdMapValue> = new Map();
3316
const stream = new Duplex({
3417
objectMode: true,
3518
read: () => undefined,
3619
write: processMessage,
3720
});
3821

39-
const events = new SafeEventEmitter();
40-
4122
const middleware: JsonRpcMiddleware<unknown, unknown> = (
4223
req,
4324
res,
44-
next,
25+
_next,
4526
end,
4627
) => {
4728
// write req to stream
4829
stream.push(req);
4930
// register request on id map
50-
idMap[req.id as unknown as string] = { req, res, next, end };
31+
if (isJsonRpcRequest(req)) {
32+
idMap.set(req.id, { res, end });
33+
}
5134
};
5235

53-
return { events, middleware, stream };
36+
return { middleware, stream };
5437

5538
/**
5639
* Writes a JSON-RPC object to the stream.
@@ -60,21 +43,17 @@ export default function createStreamMiddleware() {
6043
* @param cb - The stream write callback.
6144
*/
6245
function processMessage(
63-
res: PendingJsonRpcResponse<unknown>,
46+
res: JsonRpcResponse<unknown>,
6447
_encoding: unknown,
6548
cb: (error?: Error | null) => void,
6649
) {
6750
let err: Error | null = null;
6851
try {
69-
const isNotification = !res.id;
70-
if (isNotification) {
71-
processNotification(res as unknown as JsonRpcNotification<unknown>);
72-
} else {
73-
processResponse(res);
74-
}
52+
processResponse(res);
7553
} catch (_err) {
7654
err = _err as Error;
7755
}
56+
7857
// continue processing stream
7958
cb(err);
8059
}
@@ -84,26 +63,18 @@ export default function createStreamMiddleware() {
8463
*
8564
* @param res - The response to process.
8665
*/
87-
function processResponse(res: PendingJsonRpcResponse<unknown>) {
88-
const context = idMap[res.id as unknown as string];
66+
function processResponse(res: JsonRpcResponse<unknown>) {
67+
const context = idMap.get(res.id);
8968
if (!context) {
90-
throw new Error(`StreamMiddleware - Unknown response id "${res.id}"`);
69+
throw new Error(ErrorMessages.unknownResponse(res.id));
9170
}
9271

93-
delete idMap[res.id as unknown as string];
94-
// copy whole res onto original res
72+
// Copy response received from the stream unto original response object,
73+
// which will be returned by the engine on this side.
9574
Object.assign(context.res, res);
96-
// run callback on empty stack,
97-
// prevent internal stream-handler from catching errors
98-
setTimeout(context.end);
99-
}
10075

101-
/**
102-
* Processes a JSON-RPC notification.
103-
*
104-
* @param notif - The notification to process.
105-
*/
106-
function processNotification(notif: JsonRpcNotification<unknown>) {
107-
events.emit('notification', notif);
76+
idMap.delete(res.id);
77+
// Prevent internal stream handler from catching errors from this callback.
78+
setTimeout(context.end);
10879
}
10980
}

0 commit comments

Comments
 (0)