Skip to content

Commit 46d0255

Browse files
authored
Merge pull request #14 from BetterCorp/develop
feat(emit.ts): add support for message deduplication
2 parents 8dde2e9 + 278c9ae commit 46d0255

File tree

3 files changed

+56
-40
lines changed

3 files changed

+56
-40
lines changed

nodejs/src/plugins/events-default/events/emit.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
import { EventEmitter } from "events";
2-
import { IPluginLogger } from '../../../interfaces/logger';
2+
import { IPluginLogger } from "../../../interfaces/logger";
3+
import { randomUUID } from "crypto";
34

45
export default class emit extends EventEmitter {
56
private log: IPluginLogger;
7+
private _lastReceivedMessageIds: Array<string> = [];
8+
private set lastReceivedMessageIds(value: string) {
9+
// remove after 50 messages
10+
if (this._lastReceivedMessageIds.length > 50) {
11+
this._lastReceivedMessageIds.shift();
12+
}
13+
this._lastReceivedMessageIds.push(value);
14+
}
615

716
constructor(log: IPluginLogger) {
817
super();
@@ -22,7 +31,13 @@ export default class emit extends EventEmitter {
2231
"onEvent: {callerPluginName} listening to {pluginName}-{event}",
2332
{ callerPluginName, pluginName, event }
2433
);
25-
this.on(`${pluginName}-${event}`, listener);
34+
this.on(`${pluginName}-${event}`, (args: any) => {
35+
if (this._lastReceivedMessageIds.includes(args.msgID)) {
36+
return;
37+
}
38+
this.lastReceivedMessageIds = args.msgID;
39+
listener(args.data);
40+
});
2641
}
2742

2843
public async emitEvent(
@@ -35,6 +50,9 @@ export default class emit extends EventEmitter {
3550
"emitEvent: {callerPluginName} emitting {pluginName}-{event}",
3651
{ callerPluginName, pluginName, event }
3752
);
38-
this.emit(`${pluginName}-${event}`, args);
53+
this.emit(`${pluginName}-${event}`, {
54+
msgID: randomUUID(),
55+
data: args,
56+
});
3957
}
4058
}

nodejs/src/tests/plugins/events/events/broadcast.ts

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export function broadcast(
1515
afterEach(function () {
1616
emitter.dispose();
1717
});
18-
describe("Emit", async function () {
18+
describe("EmitBroadcast", async function () {
1919
this.timeout(maxTimeoutToExpectAResponse + 10);
2020
this.afterEach((done) => setTimeout(done, maxTimeoutToExpectAResponse));
2121
describe("emitBroadcast", async () => {
@@ -27,26 +27,27 @@ export function broadcast(
2727
//console.log(emitter)
2828
let receiveCounter = 0;
2929
setTimeout(() => {
30-
if (receiveCounter !== 2) assert.fail("Event not received");
31-
else assert.ok(receiveCounter);
30+
if (receiveCounter === 2) return assert.ok(receiveCounter);
31+
if (receiveCounter === 0) return assert.fail("Event not received");
32+
assert.fail("Received " + receiveCounter + " events");
3233
}, maxTimeoutToExpectAResponse);
33-
await emitter.onEvent(
34+
await emitter.onBroadcast(
3435
thisCaller,
3536
thisPlugin,
3637
thisEvent,
3738
async (data: any) => {
3839
receiveCounter++;
3940
}
4041
);
41-
await emitter.onEvent(
42+
await emitter.onBroadcast(
4243
thisCaller,
4344
thisPlugin,
4445
thisEvent,
4546
async (data: any) => {
4647
receiveCounter++;
4748
}
4849
);
49-
await emitter.emitEvent(thisCaller, thisPlugin, thisEvent, [emitData]);
50+
await emitter.emitBroadcast(thisCaller, thisPlugin, thisEvent, [emitData]);
5051
});
5152
it("should be able to emit to events with plugin name defined", async () => {
5253
const thisCaller = randomName();
@@ -83,7 +84,7 @@ export function broadcast(
8384
async (data: any) => {
8485
clearTimeout(emitTimeout);
8586

86-
assert.ok(data);
87+
assert.ok(data[0]);
8788
}
8889
);
8990
await emitter.emitBroadcast(thisCaller, thisCaller, thisEvent, [
@@ -104,7 +105,10 @@ export function broadcast(
104105
async (data: any) => {
105106
clearTimeout(emitTimeout);
106107

107-
assert.equal(data, [0, 1, 2, 3]);
108+
assert.deepEqual(data[0], 0);
109+
assert.deepEqual(data[1], 1);
110+
assert.deepEqual(data[2], 2);
111+
assert.deepEqual(data[3], 3);
108112
}
109113
);
110114
await emitter.emitBroadcast(
@@ -177,7 +181,7 @@ export function broadcast(
177181
async (data: any) => {
178182
clearTimeout(emitTimeout);
179183

180-
assert.strictEqual(data, emitData);
184+
assert.deepEqual(data[0], emitData);
181185
}
182186
);
183187
await emitter.emitBroadcast(thisCaller, thisPlugin, thisEvent, [
@@ -198,7 +202,7 @@ export function broadcast(
198202
async (data: any) => {
199203
clearTimeout(emitTimeout);
200204

201-
assert.strictEqual(data, emitData);
205+
assert.deepEqual(data[0], emitData);
202206
}
203207
);
204208
await emitter.emitBroadcast(thisCaller, thisCaller, thisEvent, [
@@ -293,7 +297,7 @@ export function broadcast(
293297
},
294298
];
295299
for (let typeToTest of typesToTest)
296-
describe(`emit ${typeToTest.name}`, async () => {
300+
describe(`emitBroadcast ${typeToTest.name}`, async () => {
297301
it("should be able to emit to events with plugin name defined", async () => {
298302
const thisCaller = randomName();
299303
const thisPlugin = randomName();
@@ -309,7 +313,7 @@ export function broadcast(
309313
async (data: any) => {
310314
clearTimeout(emitTimeout);
311315

312-
assert.strictEqual(data, typeToTest.data);
316+
assert.deepEqual(data[0], typeToTest.data);
313317
}
314318
);
315319
await emitter.emitBroadcast(thisCaller, thisPlugin, thisEvent, [
@@ -330,7 +334,7 @@ export function broadcast(
330334
async (data: any) => {
331335
clearTimeout(emitTimeout);
332336

333-
assert.strictEqual(data, typeToTest.data);
337+
assert.deepEqual(data[0], typeToTest.data);
334338
}
335339
);
336340
await emitter.emitBroadcast(thisCaller, thisCaller, thisEvent, [

nodejs/src/tests/plugins/events/events/emit.ts

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { EventsBase } from '../../../../events/events';
1+
import { EventsBase } from "../../../../events/events";
22
import assert from "assert";
33
import { randomUUID } from "crypto";
44

@@ -25,35 +25,26 @@ export function emit(
2525
const thisPlugin = randomName();
2626
const thisEvent = randomName();
2727
//console.log(emitter)
28-
let emitTimeout: NodeJS.Timeout | null = setTimeout(() => {
29-
assert.fail("Event not received");
28+
let receiveCounter = 0;
29+
setTimeout(() => {
30+
if (receiveCounter === 1) return assert.ok(receiveCounter);
31+
if (receiveCounter === 0) return assert.fail("Event not received");
32+
assert.fail("Received " + receiveCounter + " events");
3033
}, maxTimeoutToExpectAResponse);
3134
await emitter.onEvent(
3235
thisCaller,
3336
thisPlugin,
3437
thisEvent,
3538
async (data: any) => {
36-
if (emitTimeout === null) {
37-
assert.fail("Event received twice");
38-
return;
39-
}
40-
clearTimeout(emitTimeout);
41-
emitTimeout = null;
42-
assert.ok(data[0]);
39+
receiveCounter++;
4340
}
4441
);
4542
await emitter.onEvent(
4643
thisCaller,
4744
thisPlugin,
4845
thisEvent,
4946
async (data: any) => {
50-
if (emitTimeout === null) {
51-
assert.fail("Event received twice");
52-
return;
53-
}
54-
clearTimeout(emitTimeout);
55-
emitTimeout = null;
56-
assert.ok(data[0]);
47+
receiveCounter++;
5748
}
5849
);
5950
await emitter.emitEvent(thisCaller, thisPlugin, thisEvent, [emitData]);
@@ -91,7 +82,7 @@ export function emit(
9182
async (data: any) => {
9283
clearTimeout(emitTimeout);
9384

94-
assert.ok(data);
85+
assert.ok(data[0]);
9586
}
9687
);
9788
await emitter.emitEvent(thisCaller, thisCaller, thisEvent, [emitData]);
@@ -110,7 +101,10 @@ export function emit(
110101
async (data: any) => {
111102
clearTimeout(emitTimeout);
112103

113-
assert.equal(data, [0, 1, 2, 3]);
104+
assert.deepEqual(data[0], 0);
105+
assert.deepEqual(data[1], 1);
106+
assert.deepEqual(data[2], 2);
107+
assert.deepEqual(data[3], 3);
114108
}
115109
);
116110
await emitter.emitEvent(
@@ -179,7 +173,7 @@ export function emit(
179173
async (data: any) => {
180174
clearTimeout(emitTimeout);
181175

182-
assert.strictEqual(data, emitData);
176+
assert.deepEqual(data[0], emitData);
183177
}
184178
);
185179
await emitter.emitEvent(thisCaller, thisPlugin, thisEvent, [emitData]);
@@ -198,7 +192,7 @@ export function emit(
198192
async (data: any) => {
199193
clearTimeout(emitTimeout);
200194

201-
assert.strictEqual(data, emitData);
195+
assert.deepEqual(data[0], emitData);
202196
}
203197
);
204198
await emitter.emitEvent(thisCaller, thisCaller, thisEvent, [emitData]);
@@ -287,7 +281,7 @@ export function emit(
287281
},
288282
];
289283
for (let typeToTest of typesToTest)
290-
describe(`emit ${typeToTest.name}`, async () => {
284+
describe(`emitEvent ${typeToTest.name}`, async () => {
291285
it("should be able to emit to events with plugin name defined", async () => {
292286
const thisCaller = randomName();
293287
const thisPlugin = randomName();
@@ -303,7 +297,7 @@ export function emit(
303297
async (data: any) => {
304298
clearTimeout(emitTimeout);
305299

306-
assert.strictEqual(data, typeToTest.data);
300+
assert.deepEqual(data[0], typeToTest.data);
307301
}
308302
);
309303
await emitter.emitEvent(thisCaller, thisPlugin, thisEvent, [
@@ -324,7 +318,7 @@ export function emit(
324318
async (data: any) => {
325319
clearTimeout(emitTimeout);
326320

327-
assert.strictEqual(data, typeToTest.data);
321+
assert.deepEqual(data[0], typeToTest.data);
328322
}
329323
);
330324
await emitter.emitEvent(thisCaller, thisCaller, thisEvent, [

0 commit comments

Comments
 (0)