Skip to content

Commit de06d8d

Browse files
committed
fix: tests
1 parent b958264 commit de06d8d

File tree

8 files changed

+482
-28
lines changed

8 files changed

+482
-28
lines changed

src/RedisQueue.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ export class RedisQueue extends EventEmitter<EventMap>
503503
await this.stop();
504504
await this.clear();
505505
this.destroyWriter();
506+
await this.unsubscribe();
506507
}
507508

508509
/**

src/UDPClusterManager.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,54 @@ export class UDPClusterManager extends ClusterManager {
319319
}, timeout);
320320
}
321321

322+
/**
323+
* Destroys the UDPClusterManager by closing all opened network connections
324+
* and safely destroying all blocking sockets
325+
*
326+
* @returns {Promise<void>}
327+
* @throws {Error}
328+
*/
329+
public async destroy(): Promise<void> {
330+
// Close all UDP sockets and clean up connections
331+
const socketKeys = Object.keys(UDPClusterManager.sockets);
332+
const closePromises: Promise<void>[] = [];
333+
334+
for (const key of socketKeys) {
335+
const socket = UDPClusterManager.sockets[key];
336+
337+
if (socket) {
338+
closePromises.push(new Promise<void>(((socketKey: string): any =>
339+
((resolve: any, reject: any): any => {
340+
try {
341+
// Check if socket has close method and is not already closed
342+
if (typeof socket.close === 'function') {
343+
// Remove all event listeners to prevent memory leaks
344+
socket.removeAllListeners();
345+
346+
// Close the socket
347+
socket.close(() => {
348+
socket.unref();
349+
delete UDPClusterManager.sockets[socketKey];
350+
resolve();
351+
});
352+
} else {
353+
resolve();
354+
}
355+
} catch (error) {
356+
// Handle any errors during socket closure gracefully
357+
reject(error as Error);
358+
}
359+
}) as any)(key)));
360+
}
361+
}
362+
363+
// Wait for all sockets to close
364+
await Promise.all(closePromises);
365+
366+
// Clear the static sockets record
367+
UDPClusterManager.sockets = {};
368+
}
369+
322370
private static selectNetworkInterface(
323371
options: Pick<
324372
UDPClusterManagerOptions,

src/profile.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export function logDebugInfo({
165165
if (debugTime) {
166166
// noinspection TypeScriptUnresolvedFunction
167167
const time = parseInt(
168-
((process.hrtime as any).bigint() - start) as any,
168+
((process.hrtime as any).bigint() - BigInt(start)) as any,
169169
10,
170170
) / 1000;
171171
let timeStr: string;

test/RedisQueue.ts

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
import { logger } from './mocks';
2525
import { expect } from 'chai';
26-
import { RedisQueue, uuid } from '../src';
26+
import { RedisQueue, uuid, IMQMode } from '../src';
2727
import Redis from 'ioredis';
2828

2929
process.setMaxListeners(100);
@@ -43,13 +43,16 @@ describe('RedisQueue', function() {
4343
});
4444

4545
describe('constructor()', () => {
46-
it('should not throw', () => {
47-
expect(() => new (<any>RedisQueue)()).not.to.throw(Error);
48-
expect(() => new RedisQueue('IMQUnitTests')).not.to.throw(Error);
49-
expect(() => new RedisQueue('IMQUnitTests', {}))
46+
it('should not throw', async () => {
47+
const instances: RedisQueue[] = [];
48+
expect(() => instances.push(new (<any>RedisQueue)())).not.to.throw(Error);
49+
expect(() => instances.push(new RedisQueue('IMQUnitTests'))).not.to.throw(Error);
50+
expect(() => instances.push(new RedisQueue('IMQUnitTests', {})))
5051
.not.to.throw(Error);
51-
expect(() => new RedisQueue('IMQUnitTests', { useGzip: true }))
52+
expect(() => instances.push(new RedisQueue('IMQUnitTests', { useGzip: true })))
5253
.not.to.throw(Error);
54+
55+
await Promise.all(instances.map(instance => instance.destroy()));
5356
});
5457
});
5558

@@ -58,6 +61,7 @@ describe('RedisQueue', function() {
5861
const rq = new (<any>RedisQueue)();
5962
try { await rq.start() }
6063
catch (err) { expect(err).to.be.instanceof(TypeError) }
64+
rq.destroy().catch();
6165
});
6266

6367
it('should create reader connection', async () => {
@@ -106,7 +110,7 @@ describe('RedisQueue', function() {
106110
await rq.start();
107111
} catch (err) { passed = false }
108112
expect(passed).to.be.true;
109-
rq.destroy().catch();
113+
await rq.destroy();
110114
});
111115
});
112116

@@ -240,4 +244,99 @@ describe('RedisQueue', function() {
240244
});
241245
});
242246

247+
describe('processCleanup()', () => {
248+
it('should perform cleanup when cleanup option is enabled', async () => {
249+
const rq: any = new RedisQueue(uuid(), {
250+
logger,
251+
cleanup: true,
252+
cleanupFilter: 'test*'
253+
});
254+
await rq.start();
255+
256+
// Call processCleanup directly
257+
const result = await rq.processCleanup();
258+
expect(result).to.equal(rq);
259+
260+
await rq.destroy();
261+
});
262+
263+
it('should return early when cleanup option is disabled', async () => {
264+
const rq: any = new RedisQueue(uuid(), {
265+
logger,
266+
cleanup: false
267+
});
268+
await rq.start();
269+
270+
const result = await rq.processCleanup();
271+
expect(result).to.be.undefined;
272+
273+
await rq.destroy();
274+
});
275+
});
276+
277+
describe('lock/unlock methods', () => {
278+
it('should handle lock/unlock when writer is null', async () => {
279+
const rq: any = new RedisQueue(uuid(), { logger });
280+
// Don't start, so writer will be null
281+
282+
const lockResult = await rq.lock();
283+
expect(lockResult).to.be.false;
284+
285+
const unlockResult = await rq.unlock();
286+
expect(unlockResult).to.be.false;
287+
288+
const isLockedResult = await rq.isLocked();
289+
expect(isLockedResult).to.be.false;
290+
291+
await rq.destroy();
292+
});
293+
294+
it('should handle lock/unlock operations', async () => {
295+
const rq: any = new RedisQueue(uuid(), { logger });
296+
await rq.start();
297+
298+
// Test locking
299+
const lockResult = await rq.lock();
300+
expect(lockResult).to.be.a('boolean');
301+
302+
// Test checking if locked
303+
const isLockedResult = await rq.isLocked();
304+
expect(isLockedResult).to.be.a('boolean');
305+
306+
// Test unlocking
307+
const unlockResult = await rq.unlock();
308+
expect(unlockResult).to.be.a('boolean');
309+
310+
await rq.destroy();
311+
});
312+
});
313+
314+
describe('utility methods', () => {
315+
it('should test isPublisher and isWorker methods', async () => {
316+
const publisherQueue = new RedisQueue(uuid(), { logger }, IMQMode.PUBLISHER);
317+
const workerQueue = new RedisQueue(uuid(), { logger }, IMQMode.WORKER);
318+
319+
expect(publisherQueue.isPublisher()).to.be.true;
320+
expect(publisherQueue.isWorker()).to.be.false;
321+
322+
expect(workerQueue.isPublisher()).to.be.false;
323+
expect(workerQueue.isWorker()).to.be.true;
324+
325+
await workerQueue.destroy();
326+
await publisherQueue.destroy();
327+
});
328+
329+
it('should test key and lockKey methods', async () => {
330+
const name = uuid();
331+
const rq: any = new RedisQueue(name, { logger });
332+
333+
expect(rq.key).to.be.a('string');
334+
expect(rq.key).to.include(name);
335+
336+
expect(rq.lockKey).to.be.a('string');
337+
expect(rq.lockKey).to.include('watch:lock');
338+
339+
await rq.destroy();
340+
});
341+
});
243342
});

0 commit comments

Comments
 (0)