Skip to content

Commit 3b05094

Browse files
committed
feat: add new API for Broadcast and Presence
1 parent fe349b3 commit 3b05094

File tree

7 files changed

+524
-68
lines changed

7 files changed

+524
-68
lines changed

packages/core/realtime-js/src/RealtimeSubscription.ts renamed to packages/core/realtime-js/src/RealtimeChannel.ts

Lines changed: 136 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@ import { CHANNEL_EVENTS, CHANNEL_STATES } from './lib/constants'
22
import Push from './lib/push'
33
import RealtimeClient from './RealtimeClient'
44
import Timer from './lib/timer'
5+
import RealtimePresence from './RealtimePresence'
56

6-
export default class RealtimeSubscription {
7+
export default class RealtimeChannel {
78
bindings: any[] = []
89
timeout: number
910
state = CHANNEL_STATES.closed
1011
joinedOnce = false
1112
joinPush: Push
1213
rejoinTimer: Timer
1314
pushBuffer: Push[] = []
15+
presence: RealtimePresence
1416

1517
constructor(
1618
public topic: string,
@@ -56,9 +58,38 @@ export default class RealtimeSubscription {
5658
this.state = CHANNEL_STATES.errored
5759
this.rejoinTimer.scheduleTimeout()
5860
})
59-
this.on(CHANNEL_EVENTS.reply, (payload: any, ref: string) => {
61+
this.on(CHANNEL_EVENTS.reply, {}, (payload: any, ref: string) => {
6062
this.trigger(this.replyEventName(ref), payload)
6163
})
64+
this.presence = new RealtimePresence(this)
65+
}
66+
67+
list() {
68+
return this.presence.list()
69+
}
70+
71+
async track(
72+
payload: { [key: string]: any },
73+
opts: { [key: string]: any } = {}
74+
) {
75+
return await this.send(
76+
{
77+
type: 'presence',
78+
event: 'track',
79+
payload,
80+
},
81+
opts
82+
)
83+
}
84+
85+
async untrack(opts: { [key: string]: any } = {}) {
86+
return await this.send(
87+
{
88+
type: 'presence',
89+
event: 'untrack',
90+
},
91+
opts
92+
)
6293
}
6394

6495
rejoinUntilConnected() {
@@ -72,29 +103,69 @@ export default class RealtimeSubscription {
72103
if (this.joinedOnce) {
73104
throw `tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance`
74105
} else {
106+
const configs = this.bindings.reduce(
107+
(acc, binding: { [key: string]: any }) => {
108+
const { type } = binding
109+
if (
110+
![
111+
'phx_close',
112+
'phx_error',
113+
'phx_reply',
114+
'presence_diff',
115+
'presence_state',
116+
].includes(type)
117+
) {
118+
acc[type] = binding
119+
}
120+
return acc
121+
},
122+
{}
123+
)
124+
125+
if (Object.keys(configs).length) {
126+
this.updateJoinPayload({ configs })
127+
}
128+
75129
this.joinedOnce = true
76130
this.rejoin(timeout)
77131
return this.joinPush
78132
}
79133
}
80134

135+
/**
136+
* Registers a callback that will be executed when the channel closes.
137+
*/
81138
onClose(callback: Function) {
82-
this.on(CHANNEL_EVENTS.close, callback)
139+
this.on(CHANNEL_EVENTS.close, {}, callback)
83140
}
84141

142+
/**
143+
* Registers a callback that will be executed when the channel encounteres an error.
144+
*/
85145
onError(callback: Function) {
86-
this.on(CHANNEL_EVENTS.error, (reason: string) => callback(reason))
146+
this.on(CHANNEL_EVENTS.error, {}, (reason: string) => callback(reason))
87147
}
88148

89-
on(event: string, callback: Function) {
90-
this.bindings.push({ event, callback })
149+
on(type: string, filter?: { [key: string]: string }, callback?: Function) {
150+
this.bindings.push({
151+
type,
152+
filter: filter ?? {},
153+
callback: callback ?? (() => {}),
154+
})
91155
}
92156

93-
off(event: string) {
94-
this.bindings = this.bindings.filter((bind) => bind.event !== event)
157+
off(type: string, filter: { [key: string]: any }) {
158+
this.bindings = this.bindings.filter((bind) => {
159+
return !(
160+
bind.type === type && RealtimeChannel.isEqual(bind.filter, filter)
161+
)
162+
})
95163
}
96164

97-
canPush() {
165+
/**
166+
* Returns `true` if the socket is connected and the channel has been joined.
167+
*/
168+
canPush(): boolean {
98169
return this.socket.isConnected() && this.isJoined()
99170
}
100171

@@ -118,24 +189,24 @@ export default class RealtimeSubscription {
118189
}
119190

120191
/**
121-
* Leaves the channel
192+
* Leaves the channel.
122193
*
123194
* Unsubscribes from server events, and instructs channel to terminate on server.
124195
* Triggers onClose() hooks.
125196
*
126197
* To receive leave acknowledgements, use the a `receive` hook to bind to the server ack, ie:
127198
* channel.unsubscribe().receive("ok", () => alert("left!") )
128199
*/
129-
unsubscribe(timeout = this.timeout) {
200+
unsubscribe(timeout = this.timeout): Push {
130201
this.state = CHANNEL_STATES.leaving
131-
let onClose = () => {
202+
const onClose = () => {
132203
this.socket.log('channel', `leave ${this.topic}`)
133204
this.trigger(CHANNEL_EVENTS.close, 'leave', this.joinRef())
134205
}
135206
// Destroy joinPush to avoid connection timeouts during unscription phase
136207
this.joinPush.destroy()
137208

138-
let leavePush = new Push(this, CHANNEL_EVENTS.leave, {}, timeout)
209+
const leavePush = new Push(this, CHANNEL_EVENTS.leave, {}, timeout)
139210
leavePush.receive('ok', () => onClose()).receive('timeout', () => onClose())
140211
leavePush.send()
141212
if (!this.canPush()) {
@@ -155,15 +226,15 @@ export default class RealtimeSubscription {
155226
return payload
156227
}
157228

158-
isMember(topic: string) {
229+
isMember(topic: string): boolean {
159230
return this.topic === topic
160231
}
161232

162-
joinRef() {
233+
joinRef(): string {
163234
return this.joinPush.ref
164235
}
165236

166-
rejoin(timeout = this.timeout) {
237+
rejoin(timeout = this.timeout): void {
167238
if (this.isLeaving()) {
168239
return
169240
}
@@ -172,46 +243,78 @@ export default class RealtimeSubscription {
172243
this.joinPush.resend(timeout)
173244
}
174245

175-
trigger(event: string, payload?: any, ref?: string) {
176-
let { close, error, leave, join } = CHANNEL_EVENTS
177-
let events: string[] = [close, error, leave, join]
178-
if (ref && events.indexOf(event) >= 0 && ref !== this.joinRef()) {
246+
trigger(type: string, payload?: any, ref?: string) {
247+
const { close, error, leave, join } = CHANNEL_EVENTS
248+
const events: string[] = [close, error, leave, join]
249+
if (ref && events.indexOf(type) >= 0 && ref !== this.joinRef()) {
179250
return
180251
}
181-
let handledPayload = this.onMessage(event, payload, ref)
252+
const handledPayload = this.onMessage(type, payload, ref)
182253
if (payload && !handledPayload) {
183254
throw 'channel onMessage callbacks must return the payload, modified or unmodified'
184255
}
185256

186257
this.bindings
187258
.filter((bind) => {
188-
// Bind all events if the user specifies a wildcard.
189-
if (bind.event === '*') {
190-
return event === payload?.type
191-
} else {
192-
return bind.event === event
193-
}
259+
return (
260+
bind?.type === type &&
261+
(bind?.filter?.event === '*' ||
262+
bind?.filter?.event === payload?.event)
263+
)
194264
})
195265
.map((bind) => bind.callback(handledPayload, ref))
196266
}
197267

198-
replyEventName(ref: string) {
268+
send(
269+
payload: { type: string; [key: string]: any },
270+
opts: { [key: string]: any } = {}
271+
) {
272+
const push = this.push(
273+
payload.type as any,
274+
payload,
275+
opts.timeout ?? this.timeout
276+
)
277+
278+
return new Promise((resolve) => {
279+
push.receive('ok', () => resolve('ok'))
280+
push.receive('timeout', () => resolve('timeout'))
281+
})
282+
}
283+
284+
replyEventName(ref: string): string {
199285
return `chan_reply_${ref}`
200286
}
201287

202-
isClosed() {
288+
isClosed(): boolean {
203289
return this.state === CHANNEL_STATES.closed
204290
}
205-
isErrored() {
291+
isErrored(): boolean {
206292
return this.state === CHANNEL_STATES.errored
207293
}
208-
isJoined() {
294+
isJoined(): boolean {
209295
return this.state === CHANNEL_STATES.joined
210296
}
211-
isJoining() {
297+
isJoining(): boolean {
212298
return this.state === CHANNEL_STATES.joining
213299
}
214-
isLeaving() {
300+
isLeaving(): boolean {
215301
return this.state === CHANNEL_STATES.leaving
216302
}
303+
304+
private static isEqual(
305+
obj1: { [key: string]: string },
306+
obj2: { [key: string]: string }
307+
) {
308+
if (Object.keys(obj1).length !== Object.keys(obj2).length) {
309+
return false
310+
}
311+
312+
for (const k in obj1) {
313+
if (obj1[k] !== obj2[k]) {
314+
return false
315+
}
316+
}
317+
318+
return true
319+
}
217320
}

packages/core/realtime-js/src/RealtimeClient.ts

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from './lib/constants'
1212
import Timer from './lib/timer'
1313
import Serializer from './lib/serializer'
14-
import RealtimeSubscription from './RealtimeSubscription'
14+
import RealtimeChannel from './RealtimeChannel'
1515

1616
export type Options = {
1717
transport?: WebSocket
@@ -41,7 +41,7 @@ const noop = () => {}
4141

4242
export default class RealtimeClient {
4343
accessToken: string | null = null
44-
channels: RealtimeSubscription[] = []
44+
channels: RealtimeChannel[] = []
4545
endPoint: string = ''
4646
headers?: { [key: string]: string } = DEFAULT_HEADERS
4747
params?: { [key: string]: string } = {}
@@ -255,14 +255,43 @@ export default class RealtimeClient {
255255
*
256256
* @param channel An open subscription.
257257
*/
258-
remove(channel: RealtimeSubscription) {
258+
remove(channel: RealtimeChannel) {
259259
this.channels = this.channels.filter(
260-
(c: RealtimeSubscription) => c.joinRef() !== channel.joinRef()
260+
(c: RealtimeChannel) => c.joinRef() !== channel.joinRef()
261261
)
262262
}
263263

264-
channel(topic: string, chanParams: ChannelParams = {}): RealtimeSubscription {
265-
const chan = new RealtimeSubscription(topic, chanParams, this)
264+
channel(topic: string, chanParams: ChannelParams = {}): RealtimeChannel {
265+
const { selfBroadcast, ...params } = chanParams
266+
267+
if (selfBroadcast) {
268+
params.self_broadcast = selfBroadcast
269+
}
270+
271+
const chan = new RealtimeChannel(topic, params, this)
272+
273+
chan.presence.onJoin((key, currentPresences, newPresences) => {
274+
chan.trigger('presence', {
275+
event: 'join',
276+
key,
277+
currentPresences,
278+
newPresences,
279+
})
280+
})
281+
282+
chan.presence.onLeave((key, currentPresences, leftPresences) => {
283+
chan.trigger('presence', {
284+
event: 'leave',
285+
key,
286+
currentPresences,
287+
leftPresences,
288+
})
289+
})
290+
291+
chan.presence.onSync(() => {
292+
chan.trigger('presence', { event: 'sync' })
293+
})
294+
266295
this.channels.push(chan)
267296
return chan
268297
}
@@ -306,8 +335,8 @@ export default class RealtimeClient {
306335
payload
307336
)
308337
this.channels
309-
.filter((channel: RealtimeSubscription) => channel.isMember(topic))
310-
.forEach((channel: RealtimeSubscription) =>
338+
.filter((channel: RealtimeChannel) => channel.isMember(topic))
339+
.forEach((channel: RealtimeChannel) =>
311340
channel.trigger(event, payload, ref)
312341
)
313342
this.stateChangeCallbacks.message.forEach((callback) => callback(msg))
@@ -395,7 +424,7 @@ export default class RealtimeClient {
395424
}
396425

397426
private _triggerChanError() {
398-
this.channels.forEach((channel: RealtimeSubscription) =>
427+
this.channels.forEach((channel: RealtimeChannel) =>
399428
channel.trigger(CHANNEL_EVENTS.error)
400429
)
401430
}

0 commit comments

Comments
 (0)