@@ -2,15 +2,17 @@ import { CHANNEL_EVENTS, CHANNEL_STATES } from './lib/constants'
2
2
import Push from './lib/push'
3
3
import RealtimeClient from './RealtimeClient'
4
4
import Timer from './lib/timer'
5
+ import RealtimePresence from './RealtimePresence'
5
6
6
- export default class RealtimeSubscription {
7
+ export default class RealtimeChannel {
7
8
bindings : any [ ] = [ ]
8
9
timeout : number
9
10
state = CHANNEL_STATES . closed
10
11
joinedOnce = false
11
12
joinPush : Push
12
13
rejoinTimer : Timer
13
14
pushBuffer : Push [ ] = [ ]
15
+ presence : RealtimePresence
14
16
15
17
constructor (
16
18
public topic : string ,
@@ -56,9 +58,38 @@ export default class RealtimeSubscription {
56
58
this . state = CHANNEL_STATES . errored
57
59
this . rejoinTimer . scheduleTimeout ( )
58
60
} )
59
- this . on ( CHANNEL_EVENTS . reply , ( payload : any , ref : string ) => {
61
+ this . on ( CHANNEL_EVENTS . reply , { } , ( payload : any , ref : string ) => {
60
62
this . trigger ( this . replyEventName ( ref ) , payload )
61
63
} )
64
+ this . presence = new RealtimePresence ( this )
65
+ }
66
+
67
+ list ( ) {
68
+ return this . presence . list ( )
69
+ }
70
+
71
+ async track (
72
+ payload : { [ key : string ] : any } ,
73
+ opts : { [ key : string ] : any } = { }
74
+ ) {
75
+ return await this . send (
76
+ {
77
+ type : 'presence' ,
78
+ event : 'track' ,
79
+ payload,
80
+ } ,
81
+ opts
82
+ )
83
+ }
84
+
85
+ async untrack ( opts : { [ key : string ] : any } = { } ) {
86
+ return await this . send (
87
+ {
88
+ type : 'presence' ,
89
+ event : 'untrack' ,
90
+ } ,
91
+ opts
92
+ )
62
93
}
63
94
64
95
rejoinUntilConnected ( ) {
@@ -72,29 +103,69 @@ export default class RealtimeSubscription {
72
103
if ( this . joinedOnce ) {
73
104
throw `tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance`
74
105
} else {
106
+ const configs = this . bindings . reduce (
107
+ ( acc , binding : { [ key : string ] : any } ) => {
108
+ const { type } = binding
109
+ if (
110
+ ! [
111
+ 'phx_close' ,
112
+ 'phx_error' ,
113
+ 'phx_reply' ,
114
+ 'presence_diff' ,
115
+ 'presence_state' ,
116
+ ] . includes ( type )
117
+ ) {
118
+ acc [ type ] = binding
119
+ }
120
+ return acc
121
+ } ,
122
+ { }
123
+ )
124
+
125
+ if ( Object . keys ( configs ) . length ) {
126
+ this . updateJoinPayload ( { configs } )
127
+ }
128
+
75
129
this . joinedOnce = true
76
130
this . rejoin ( timeout )
77
131
return this . joinPush
78
132
}
79
133
}
80
134
135
+ /**
136
+ * Registers a callback that will be executed when the channel closes.
137
+ */
81
138
onClose ( callback : Function ) {
82
- this . on ( CHANNEL_EVENTS . close , callback )
139
+ this . on ( CHANNEL_EVENTS . close , { } , callback )
83
140
}
84
141
142
+ /**
143
+ * Registers a callback that will be executed when the channel encounteres an error.
144
+ */
85
145
onError ( callback : Function ) {
86
- this . on ( CHANNEL_EVENTS . error , ( reason : string ) => callback ( reason ) )
146
+ this . on ( CHANNEL_EVENTS . error , { } , ( reason : string ) => callback ( reason ) )
87
147
}
88
148
89
- on ( event : string , callback : Function ) {
90
- this . bindings . push ( { event, callback } )
149
+ on ( type : string , filter ?: { [ key : string ] : string } , callback ?: Function ) {
150
+ this . bindings . push ( {
151
+ type,
152
+ filter : filter ?? { } ,
153
+ callback : callback ?? ( ( ) => { } ) ,
154
+ } )
91
155
}
92
156
93
- off ( event : string ) {
94
- this . bindings = this . bindings . filter ( ( bind ) => bind . event !== event )
157
+ off ( type : string , filter : { [ key : string ] : any } ) {
158
+ this . bindings = this . bindings . filter ( ( bind ) => {
159
+ return ! (
160
+ bind . type === type && RealtimeChannel . isEqual ( bind . filter , filter )
161
+ )
162
+ } )
95
163
}
96
164
97
- canPush ( ) {
165
+ /**
166
+ * Returns `true` if the socket is connected and the channel has been joined.
167
+ */
168
+ canPush ( ) : boolean {
98
169
return this . socket . isConnected ( ) && this . isJoined ( )
99
170
}
100
171
@@ -118,24 +189,24 @@ export default class RealtimeSubscription {
118
189
}
119
190
120
191
/**
121
- * Leaves the channel
192
+ * Leaves the channel.
122
193
*
123
194
* Unsubscribes from server events, and instructs channel to terminate on server.
124
195
* Triggers onClose() hooks.
125
196
*
126
197
* To receive leave acknowledgements, use the a `receive` hook to bind to the server ack, ie:
127
198
* channel.unsubscribe().receive("ok", () => alert("left!") )
128
199
*/
129
- unsubscribe ( timeout = this . timeout ) {
200
+ unsubscribe ( timeout = this . timeout ) : Push {
130
201
this . state = CHANNEL_STATES . leaving
131
- let onClose = ( ) => {
202
+ const onClose = ( ) => {
132
203
this . socket . log ( 'channel' , `leave ${ this . topic } ` )
133
204
this . trigger ( CHANNEL_EVENTS . close , 'leave' , this . joinRef ( ) )
134
205
}
135
206
// Destroy joinPush to avoid connection timeouts during unscription phase
136
207
this . joinPush . destroy ( )
137
208
138
- let leavePush = new Push ( this , CHANNEL_EVENTS . leave , { } , timeout )
209
+ const leavePush = new Push ( this , CHANNEL_EVENTS . leave , { } , timeout )
139
210
leavePush . receive ( 'ok' , ( ) => onClose ( ) ) . receive ( 'timeout' , ( ) => onClose ( ) )
140
211
leavePush . send ( )
141
212
if ( ! this . canPush ( ) ) {
@@ -155,15 +226,15 @@ export default class RealtimeSubscription {
155
226
return payload
156
227
}
157
228
158
- isMember ( topic : string ) {
229
+ isMember ( topic : string ) : boolean {
159
230
return this . topic === topic
160
231
}
161
232
162
- joinRef ( ) {
233
+ joinRef ( ) : string {
163
234
return this . joinPush . ref
164
235
}
165
236
166
- rejoin ( timeout = this . timeout ) {
237
+ rejoin ( timeout = this . timeout ) : void {
167
238
if ( this . isLeaving ( ) ) {
168
239
return
169
240
}
@@ -172,46 +243,78 @@ export default class RealtimeSubscription {
172
243
this . joinPush . resend ( timeout )
173
244
}
174
245
175
- trigger ( event : string , payload ?: any , ref ?: string ) {
176
- let { close, error, leave, join } = CHANNEL_EVENTS
177
- let events : string [ ] = [ close , error , leave , join ]
178
- if ( ref && events . indexOf ( event ) >= 0 && ref !== this . joinRef ( ) ) {
246
+ trigger ( type : string , payload ?: any , ref ?: string ) {
247
+ const { close, error, leave, join } = CHANNEL_EVENTS
248
+ const events : string [ ] = [ close , error , leave , join ]
249
+ if ( ref && events . indexOf ( type ) >= 0 && ref !== this . joinRef ( ) ) {
179
250
return
180
251
}
181
- let handledPayload = this . onMessage ( event , payload , ref )
252
+ const handledPayload = this . onMessage ( type , payload , ref )
182
253
if ( payload && ! handledPayload ) {
183
254
throw 'channel onMessage callbacks must return the payload, modified or unmodified'
184
255
}
185
256
186
257
this . bindings
187
258
. filter ( ( bind ) => {
188
- // Bind all events if the user specifies a wildcard.
189
- if ( bind . event === '*' ) {
190
- return event === payload ?. type
191
- } else {
192
- return bind . event === event
193
- }
259
+ return (
260
+ bind ?. type === type &&
261
+ ( bind ?. filter ?. event === '*' ||
262
+ bind ?. filter ?. event === payload ?. event )
263
+ )
194
264
} )
195
265
. map ( ( bind ) => bind . callback ( handledPayload , ref ) )
196
266
}
197
267
198
- replyEventName ( ref : string ) {
268
+ send (
269
+ payload : { type : string ; [ key : string ] : any } ,
270
+ opts : { [ key : string ] : any } = { }
271
+ ) {
272
+ const push = this . push (
273
+ payload . type as any ,
274
+ payload ,
275
+ opts . timeout ?? this . timeout
276
+ )
277
+
278
+ return new Promise ( ( resolve ) => {
279
+ push . receive ( 'ok' , ( ) => resolve ( 'ok' ) )
280
+ push . receive ( 'timeout' , ( ) => resolve ( 'timeout' ) )
281
+ } )
282
+ }
283
+
284
+ replyEventName ( ref : string ) : string {
199
285
return `chan_reply_${ ref } `
200
286
}
201
287
202
- isClosed ( ) {
288
+ isClosed ( ) : boolean {
203
289
return this . state === CHANNEL_STATES . closed
204
290
}
205
- isErrored ( ) {
291
+ isErrored ( ) : boolean {
206
292
return this . state === CHANNEL_STATES . errored
207
293
}
208
- isJoined ( ) {
294
+ isJoined ( ) : boolean {
209
295
return this . state === CHANNEL_STATES . joined
210
296
}
211
- isJoining ( ) {
297
+ isJoining ( ) : boolean {
212
298
return this . state === CHANNEL_STATES . joining
213
299
}
214
- isLeaving ( ) {
300
+ isLeaving ( ) : boolean {
215
301
return this . state === CHANNEL_STATES . leaving
216
302
}
303
+
304
+ private static isEqual (
305
+ obj1 : { [ key : string ] : string } ,
306
+ obj2 : { [ key : string ] : string }
307
+ ) {
308
+ if ( Object . keys ( obj1 ) . length !== Object . keys ( obj2 ) . length ) {
309
+ return false
310
+ }
311
+
312
+ for ( const k in obj1 ) {
313
+ if ( obj1 [ k ] !== obj2 [ k ] ) {
314
+ return false
315
+ }
316
+ }
317
+
318
+ return true
319
+ }
217
320
}
0 commit comments