Skip to content

Commit 3fb412b

Browse files
committed
fix: persist leader wait for message silence
1 parent 3deafd3 commit 3fb412b

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

src/y-socket-io/y-socket-io.js

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,6 @@ export class YSocketIO {
414414
if (nsp?.sockets.size === 0 && stream) {
415415
this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT)
416416
if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true)
417-
this.persistentLeaderOf.delete(ns)
418417
}
419418
logSocketIO(`disconnecting socket in ${ns}, ${nsp?.sockets.size || 0} remaining`)
420419
}
@@ -475,7 +474,8 @@ export class YSocketIO {
475474
const nsp = this.namespaceMap.get(namespace)
476475
if (!nsp) return
477476
if (nsp.sockets.size === 0 && this.subscriber) {
478-
this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT)
477+
const isLeader = this.persistentLeaderOf.has(namespace)
478+
this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT, isLeader)
479479
}
480480

481481
/** @type {Uint8Array[]} */
@@ -708,13 +708,19 @@ export class YSocketIO {
708708
* @param {string} namespace
709709
* @param {string} stream
710710
* @param {number=} removeAfterWait
711+
* @param {boolean=} extendExisting
711712
*/
712-
cleanupNamespace (namespace, stream, removeAfterWait) {
713+
cleanupNamespace (namespace, stream, removeAfterWait, extendExisting = false) {
713714
if (!removeAfterWait) {
714715
this.awaitingCleanupNamespace.delete(namespace)
715716
return this.cleanupNamespaceImpl(namespace, stream)
716717
}
717-
if (this.awaitingCleanupNamespace.has(namespace)) return
718+
719+
const existingTimer = this.awaitingCleanupNamespace.get(namespace)
720+
if (existingTimer) {
721+
if (!extendExisting) return
722+
clearTimeout(existingTimer)
723+
}
718724

719725
const timer = setTimeout(async () => {
720726
const awaitingPersist = this.awaitingPersistMap.get(namespace)
@@ -739,6 +745,15 @@ export class YSocketIO {
739745
this.namespaceDocMap.get(namespace)?.ydoc.destroy()
740746
this.namespaceDocMap.delete(namespace)
741747
this.namespacePersistentMap.delete(namespace)
748+
this.persistentLeaderOf.delete(namespace)
749+
750+
// clear leader key in redis when we fully clean up
751+
if (this.client) {
752+
const key = this.getLeaderKeyOf(namespace)
753+
this.client.redis
754+
.del(key)
755+
.catch((e) => console.error(e))
756+
}
742757
}
743758

744759
async waitUntilWorkerReady () {
@@ -818,8 +833,11 @@ export class YSocketIO {
818833
const curLeader = await redis.get(key)
819834

820835
// remove orphaned if exist
821-
const aliveClients = this.namespaceMap.get(namespace)?.sockets.size || 0
822-
if (aliveClients === 0) {
836+
const nsp = this.namespaceMap.get(namespace)
837+
const hasDoc = this.namespaceDocMap.has(namespace)
838+
const hasCleanupTimer = this.awaitingCleanupNamespace.has(namespace)
839+
const isActiveHere = Boolean(nsp || hasDoc || hasCleanupTimer)
840+
if (!isActiveHere) {
823841
logSocketIO(`clearing leader heartbeat for [${namespace}] (SID: ${this.serverId})`)
824842
this.persistentLeaderOf.delete(namespace)
825843
continue

0 commit comments

Comments
 (0)