Skip to content

Commit 295647c

Browse files
BrentLayneleibale
andauthored
fix(clustered pubsub): check that client.isOpen before calling client.disconnect() when unsubscribing (#2687)
* Confirm the client isOpen before disconnecting * Write tests * fix tests * fix tests --------- Co-authored-by: Leibale Eidelman <me@leibale.com>
1 parent 5a96058 commit 295647c

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ export default class RedisClusterSlots<
562562
const client = await this.getPubSubClient();
563563
await unsubscribe(client);
564564

565-
if (!client.isPubSubActive) {
565+
if (!client.isPubSubActive && client.isOpen) {
566566
await client.disconnect();
567567
this.pubSubNode = undefined;
568568
}
@@ -613,7 +613,7 @@ export default class RedisClusterSlots<
613613
const client = await master.pubSubClient;
614614
await unsubscribe(client);
615615

616-
if (!client.isPubSubActive) {
616+
if (!client.isPubSubActive && client.isOpen) {
617617
await client.disconnect();
618618
master.pubSubClient = undefined;
619619
}

packages/client/lib/cluster/index.spec.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,18 @@ describe('Cluster', () => {
235235

236236
assert.equal(cluster.pubSubNode, undefined);
237237
}, GLOBAL.CLUSTERS.OPEN);
238+
239+
testUtils.testWithCluster('concurrent UNSUBSCRIBE does not throw an error (#2685)', async cluster => {
240+
const listener = spy();
241+
await Promise.all([
242+
cluster.subscribe('1', listener),
243+
cluster.subscribe('2', listener)
244+
]);
245+
await Promise.all([
246+
cluster.unsubscribe('1', listener),
247+
cluster.unsubscribe('2', listener)
248+
]);
249+
}, GLOBAL.CLUSTERS.OPEN);
238250

239251
testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
240252
const listener = spy();
@@ -323,6 +335,21 @@ describe('Cluster', () => {
323335
minimumDockerVersion: [7]
324336
});
325337

338+
testUtils.testWithCluster('concurrent SUNSUBCRIBE does not throw an error (#2685)', async cluster => {
339+
const listener = spy();
340+
await Promise.all([
341+
await cluster.sSubscribe('1', listener),
342+
await cluster.sSubscribe('2', listener)
343+
]);
344+
await Promise.all([
345+
cluster.sUnsubscribe('1', listener),
346+
cluster.sUnsubscribe('2', listener)
347+
]);
348+
}, {
349+
...GLOBAL.CLUSTERS.OPEN,
350+
minimumDockerVersion: [7]
351+
});
352+
326353
testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
327354
const SLOT = 10328,
328355
migrating = cluster.slots[SLOT].master,

0 commit comments

Comments
 (0)