Skip to content
This repository was archived by the owner on Oct 9, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,15 @@
"typescript": "^5.7.3",
"vitest": "^2.1.9",
"web-worker": "1.2.0"
},
"exports": {
".": {
"import": "./dist/module/index.js",
"require": "./dist/main/index.js"
},
"./websocket": {
"node": "./src/node.js",
"default": "./src/native.js"
}
}
}
15 changes: 13 additions & 2 deletions src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ export default class RealtimeChannel {
})
} else {
this.unsubscribe()
this.state = CHANNEL_STATES.errored

callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
Expand All @@ -296,6 +298,7 @@ export default class RealtimeChannel {
}
})
.receive('error', (error: { [key: string]: any }) => {
this.state = CHANNEL_STATES.errored
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
Expand Down Expand Up @@ -511,8 +514,6 @@ export default class RealtimeChannel {
this._trigger(CHANNEL_EVENTS.close, 'leave', this._joinRef())
}

this.rejoinTimer.reset()
// Destroy joinPush to avoid connection timeouts during unscription phase
this.joinPush.destroy()

return new Promise((resolve) => {
Expand All @@ -536,6 +537,16 @@ export default class RealtimeChannel {
}
})
}
/**
* Teardown the channel.
*
* Destroys and stops related timers.
*/
teardown() {
this.pushBuffer.forEach((push: Push) => push.destroy())
this.rejoinTimer && clearTimeout(this.rejoinTimer.timer)
this.joinPush.destroy()
}

/** @internal */

Expand Down
60 changes: 24 additions & 36 deletions src/RealtimeClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { WebSocket as WSWebSocket } from 'ws'
import WebSocket from './WebSocket'

import {
CHANNEL_EVENTS,
Expand All @@ -17,6 +17,7 @@ import Timer from './lib/timer'
import { httpEndpointURL } from './lib/transformers'
import RealtimeChannel from './RealtimeChannel'
import type { RealtimeChannelOptions } from './RealtimeChannel'
import Push from './lib/push'

type Fetch = typeof fetch

Expand Down Expand Up @@ -54,7 +55,7 @@ export interface WebSocketLikeConstructor {
): WebSocketLike
}

export type WebSocketLike = WebSocket | WSWebSocket | WSWebSocketDummy
export type WebSocketLike = WebSocket | WSWebSocketDummy

export interface WebSocketLikeError {
error: any
Expand All @@ -81,17 +82,17 @@ export type RealtimeClientOptions = {
accessToken?: () => Promise<string | null>
}

const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined'
const WORKER_SCRIPT = `
addEventListener("message", (e) => {
if (e.data.event === "start") {
setInterval(() => postMessage({ event: "keepAlive" }), e.data.interval);
}
});`

export default class RealtimeClient {
accessTokenValue: string | null = null
apiKey: string | null = null
channels: Set<RealtimeChannel> = new Set()
channels: RealtimeChannel[] = new Array()
endPoint: string = ''
httpEndpoint: string = ''
headers?: { [key: string]: string } = DEFAULT_HEADERS
Expand Down Expand Up @@ -209,33 +210,21 @@ export default class RealtimeClient {
if (this.conn) {
return
}

if (!this.transport) {
this.transport = WebSocket
}
if (this.transport) {
this.conn = new this.transport(this.endpointURL(), undefined, {
headers: this.headers,
})
this.setupConnection()
return
}

if (NATIVE_WEBSOCKET_AVAILABLE) {
this.conn = new WebSocket(this.endpointURL())
this.setupConnection()
return
}

this.conn = new WSWebSocketDummy(this.endpointURL(), undefined, {
close: () => {
this.conn = null
},
})

import('ws').then(({ default: WS }) => {
this.conn = new WS(this.endpointURL(), undefined, {
headers: this.headers,
})
this.setupConnection()
})
}

/**
Expand Down Expand Up @@ -264,17 +253,19 @@ export default class RealtimeClient {
this.conn.close()
}
this.conn = null

// remove open handles
this.heartbeatTimer && clearInterval(this.heartbeatTimer)
this.reconnectTimer.reset()
this.channels.forEach((channel) => channel.teardown())
}
}

/**
* Returns all created channels
*/
getChannels(): RealtimeChannel[] {
return Array.from(this.channels)
return this.channels
}

/**
Expand All @@ -285,9 +276,12 @@ export default class RealtimeClient {
channel: RealtimeChannel
): Promise<RealtimeRemoveChannelResponse> {
const status = await channel.unsubscribe()
if (this.channels.size === 0) {
this.channels = this.channels.filter((c) => c._joinRef !== channel._joinRef)

if (this.channels.length === 0) {
this.disconnect()
}

return status
}

Expand All @@ -296,13 +290,10 @@ export default class RealtimeClient {
*/
async removeAllChannels(): Promise<RealtimeRemoveChannelResponse[]> {
const values_1 = await Promise.all(
Array.from(this.channels).map((channel) => {
this.channels.delete(channel)
return channel.unsubscribe()
})
this.channels.map((channel) => channel.unsubscribe())
)
this.channels = []
this.disconnect()

return values_1
}

Expand Down Expand Up @@ -349,7 +340,8 @@ export default class RealtimeClient {

if (!exists) {
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
this.channels.add(chan)
this.channels.push(chan)

return chan
} else {
return exists
Expand Down Expand Up @@ -492,7 +484,7 @@ export default class RealtimeClient {
* @internal
*/
_leaveOpenTopic(topic: string): void {
let dupChannel = Array.from(this.channels).find(
let dupChannel = this.channels.find(
(c) => c.topic === topic && (c._isJoined() || c._isJoining())
)
if (dupChannel) {
Expand All @@ -509,7 +501,7 @@ export default class RealtimeClient {
* @internal
*/
_remove(channel: RealtimeChannel) {
this.channels.delete(channel)
this.channels = this.channels.filter((c) => c.topic !== channel.topic)
}

/**
Expand Down Expand Up @@ -560,7 +552,7 @@ export default class RealtimeClient {
}

/** @internal */
private async _onConnOpen() {
private _onConnOpen() {
this.log('transport', `connected to ${this.endpointURL()}`)
this.flushSendBuffer()
this.reconnectTimer.reset()
Expand All @@ -576,11 +568,10 @@ export default class RealtimeClient {
} else {
this.log('worker', `starting default worker`)
}

const objectUrl = this._workerObjectUrl(this.workerUrl!)
this.workerRef = new Worker(objectUrl)
this.workerRef.onerror = (error) => {
this.log('worker', 'worker error', error.message)
this.log('worker', 'worker error', (error as ErrorEvent).message)
this.workerRef!.terminate()
}
this.workerRef.onmessage = (event) => {
Expand All @@ -593,12 +584,10 @@ export default class RealtimeClient {
interval: this.heartbeatIntervalMs,
})
}

this.stateChangeCallbacks.open.forEach((callback) => callback())!
this.stateChangeCallbacks.open.forEach((callback) => callback())
}

/** @internal */

private _onConnClose(event: any) {
this.log('transport', 'close', event)
this._triggerChanError()
Expand Down Expand Up @@ -631,7 +620,6 @@ export default class RealtimeClient {
}
const prefix = url.match(/\?/) ? '&' : '?'
const query = new URLSearchParams(params)

return `${url}${prefix}${query}`
}

Expand Down
4 changes: 4 additions & 0 deletions src/WebSocket.native.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Native/browser WebSocket entry point
const NativeWebSocket = typeof WebSocket !== 'undefined' ? WebSocket : undefined

export default NativeWebSocket
4 changes: 4 additions & 0 deletions src/WebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Node.js WebSocket entry point
import WebSocket from 'ws'

export default WebSocket
Loading