Skip to content

Commit 2ee9199

Browse files
authored
fix: Add heartbeat callback; Move to Sets vs Arrays (#460)
1 parent 527f3fb commit 2ee9199

File tree

3 files changed

+129
-43
lines changed

3 files changed

+129
-43
lines changed

src/RealtimeClient.ts

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ export type RealtimeMessage = {
3838
}
3939

4040
export type RealtimeRemoveChannelResponse = 'ok' | 'timed out' | 'error'
41+
export type HeartbeatStatus =
42+
| 'sent'
43+
| 'ok'
44+
| 'error'
45+
| 'timeout'
46+
| 'disconnected'
4147

4248
const noop = () => {}
4349

@@ -86,7 +92,7 @@ const WORKER_SCRIPT = `
8692
export default class RealtimeClient {
8793
accessTokenValue: string | null = null
8894
apiKey: string | null = null
89-
channels: RealtimeChannel[] = []
95+
channels: Set<RealtimeChannel> = new Set()
9096
endPoint: string = ''
9197
httpEndpoint: string = ''
9298
headers?: { [key: string]: string } = DEFAULT_HEADERS
@@ -96,6 +102,7 @@ export default class RealtimeClient {
96102
heartbeatIntervalMs: number = 25000
97103
heartbeatTimer: ReturnType<typeof setInterval> | undefined = undefined
98104
pendingHeartbeatRef: string | null = null
105+
heartbeatCallback: (status: HeartbeatStatus) => void = noop
99106
ref: number = 0
100107
reconnectTimer: Timer
101108
logger: Function = noop
@@ -268,7 +275,7 @@ export default class RealtimeClient {
268275
* Returns all created channels
269276
*/
270277
getChannels(): RealtimeChannel[] {
271-
return this.channels
278+
return Array.from(this.channels)
272279
}
273280

274281
/**
@@ -279,7 +286,7 @@ export default class RealtimeClient {
279286
channel: RealtimeChannel
280287
): Promise<RealtimeRemoveChannelResponse> {
281288
const status = await channel.unsubscribe()
282-
if (this.channels.length === 0) {
289+
if (this.channels.size === 0) {
283290
this.disconnect()
284291
}
285292
return status
@@ -290,9 +297,13 @@ export default class RealtimeClient {
290297
*/
291298
async removeAllChannels(): Promise<RealtimeRemoveChannelResponse[]> {
292299
const values_1 = await Promise.all(
293-
this.channels.map((channel) => channel.unsubscribe())
300+
Array.from(this.channels).map((channel) => {
301+
this.channels.delete(channel)
302+
return channel.unsubscribe()
303+
})
294304
)
295305
this.disconnect()
306+
296307
return values_1
297308
}
298309

@@ -332,9 +343,18 @@ export default class RealtimeClient {
332343
topic: string,
333344
params: RealtimeChannelOptions = { config: {} }
334345
): RealtimeChannel {
335-
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
336-
this.channels.push(chan)
337-
return chan
346+
const realtimeTopic = `realtime:${topic}`
347+
const exists = this.getChannels().find(
348+
(c: RealtimeChannel) => c.topic === realtimeTopic
349+
)
350+
351+
if (!exists) {
352+
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
353+
this.channels.add(chan)
354+
return chan
355+
} else {
356+
return exists
357+
}
338358
}
339359

340360
/**
@@ -394,6 +414,7 @@ export default class RealtimeClient {
394414
*/
395415
async sendHeartbeat() {
396416
if (!this.isConnected()) {
417+
this.heartbeatCallback('disconnected')
397418
return
398419
}
399420
if (this.pendingHeartbeatRef) {
@@ -402,6 +423,7 @@ export default class RealtimeClient {
402423
'transport',
403424
'heartbeat timeout. Attempting to re-establish connection'
404425
)
426+
this.heartbeatCallback('timeout')
405427
this.conn?.close(WS_CLOSE_NORMAL, 'hearbeat timeout')
406428
return
407429
}
@@ -412,9 +434,13 @@ export default class RealtimeClient {
412434
payload: {},
413435
ref: this.pendingHeartbeatRef,
414436
})
437+
this.heartbeatCallback('sent')
415438
await this.setAuth()
416439
}
417440

441+
onHeartbeat(callback: (status: HeartbeatStatus) => void): void {
442+
this.heartbeatCallback = callback
443+
}
418444
/**
419445
* Flushes send buffer
420446
*/
@@ -467,7 +493,7 @@ export default class RealtimeClient {
467493
* @internal
468494
*/
469495
_leaveOpenTopic(topic: string): void {
470-
let dupChannel = this.channels.find(
496+
let dupChannel = Array.from(this.channels).find(
471497
(c) => c.topic === topic && (c._isJoined() || c._isJoining())
472498
)
473499
if (dupChannel) {
@@ -484,9 +510,7 @@ export default class RealtimeClient {
484510
* @internal
485511
*/
486512
_remove(channel: RealtimeChannel) {
487-
this.channels = this.channels.filter(
488-
(c: RealtimeChannel) => c._joinRef() !== channel._joinRef()
489-
)
513+
this.channels.delete(channel)
490514
}
491515

492516
/**
@@ -510,6 +534,10 @@ export default class RealtimeClient {
510534
this.decode(rawMessage.data, (msg: RealtimeMessage) => {
511535
let { topic, event, payload, ref } = msg
512536

537+
if (topic === 'phoenix' && event === 'phx_reply') {
538+
this.heartbeatCallback(msg.payload.status == 'ok' ? 'ok' : 'error')
539+
}
540+
513541
if (ref && ref === this.pendingHeartbeatRef) {
514542
this.pendingHeartbeatRef = null
515543
}
@@ -521,11 +549,13 @@ export default class RealtimeClient {
521549
}`,
522550
payload
523551
)
524-
this.channels
552+
553+
Array.from(this.channels)
525554
.filter((channel: RealtimeChannel) => channel._isMember(topic))
526555
.forEach((channel: RealtimeChannel) =>
527556
channel._trigger(event, payload, ref)
528557
)
558+
529559
this.stateChangeCallbacks.message.forEach((callback) => callback(msg))
530560
})
531561
}

test/channel.test.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
beforeAll,
1010
afterAll,
1111
vi,
12+
it,
1213
} from 'vitest'
1314

1415
import RealtimeClient from '../src/RealtimeClient'
@@ -814,12 +815,12 @@ describe('onClose', () => {
814815
})
815816

816817
test('removes channel from socket', () => {
817-
assert.equal(socket.channels.length, 1)
818-
assert.deepEqual(socket.channels[0], channel)
818+
assert.equal(socket.getChannels().length, 1)
819+
assert.deepEqual(socket.getChannels()[0], channel)
819820

820821
channel._trigger('phx_close')
821822

822-
assert.equal(socket.channels.length, 0)
823+
assert.equal(socket.getChannels().length, 0)
823824
})
824825
})
825826

@@ -1113,13 +1114,13 @@ describe('leave', () => {
11131114

11141115
test("closes channel on 'ok' from server", () => {
11151116
const anotherChannel = socket.channel('another', { three: 'four' })
1116-
assert.equal(socket.channels.length, 2)
1117+
assert.equal(socket.getChannels().length, 2)
11171118

11181119
channel.unsubscribe()
11191120
channel.joinPush.trigger('ok', {})
11201121

1121-
assert.equal(socket.channels.length, 1)
1122-
assert.deepEqual(socket.channels[0], anotherChannel)
1122+
assert.equal(socket.getChannels().length, 1)
1123+
assert.deepEqual(socket.getChannels()[0], anotherChannel)
11231124
})
11241125

11251126
test("sets state to closed on 'ok' event", () => {

0 commit comments

Comments
 (0)