1
1
from __future__ import annotations
2
2
import asyncio
3
3
import logging
4
- from typing import Optional , TYPE_CHECKING
4
+ from typing import Optional , TYPE_CHECKING , Dict , Any , Union
5
5
from ably .realtime .connection import ConnectionState
6
6
from ably .transport .websockettransport import ProtocolMessageAction
7
7
from ably .rest .channel import Channel , Channels as RestChannels
14
14
15
15
if TYPE_CHECKING :
16
16
from ably .realtime .realtime import AblyRealtime
17
+ from ably .util .crypto import CipherParams
17
18
18
19
log = logging .getLogger (__name__ )
19
20
20
21
22
+ class ChannelOptions :
23
+ """Channel options for Ably Realtime channels
24
+
25
+ Attributes
26
+ ----------
27
+ cipher : CipherParams, optional
28
+ Requests encryption for this channel when not null, and specifies encryption-related parameters.
29
+ params : Dict[str, str], optional
30
+ Channel parameters that configure the behavior of the channel.
31
+ """
32
+
33
+ def __init__ (self , cipher : Optional [CipherParams ] = None , params : Optional [dict ] = None ):
34
+ self .__cipher = cipher
35
+ self .__params = params
36
+ # Validate params
37
+ if self .__params and not isinstance (self .__params , dict ):
38
+ raise AblyException ("params must be a dictionary" , 40000 , 400 )
39
+
40
+ @property
41
+ def cipher (self ):
42
+ """Get cipher configuration"""
43
+ return self .__cipher
44
+
45
+ @cipher .setter
46
+ def cipher (self , value ):
47
+ """Set cipher configuration"""
48
+ self .__cipher = value
49
+
50
+ @property
51
+ def params (self ) -> Dict [str , str ]:
52
+ """Get channel parameters"""
53
+ return self .__params
54
+
55
+ @params .setter
56
+ def params (self , value : Dict [str , str ]):
57
+ """Set channel parameters"""
58
+ if value and not isinstance (value , dict ):
59
+ raise AblyException ("params must be a dictionary" , 40000 , 400 )
60
+ self .__params = value or {}
61
+
62
+ def __eq__ (self , other ):
63
+ """Check equality with another ChannelOptions instance"""
64
+ if not isinstance (other , ChannelOptions ):
65
+ return False
66
+
67
+ return (self .__cipher == other .__cipher and
68
+ self .__params == other .__params )
69
+
70
+ def __hash__ (self ):
71
+ """Make ChannelOptions hashable"""
72
+ return hash ((
73
+ self .__cipher ,
74
+ tuple (sorted (self .__params .items ())) if self .__params else None ,
75
+ ))
76
+
77
+ def to_dict (self ) -> Dict [str , Any ]:
78
+ """Convert to dictionary representation"""
79
+ result = {}
80
+ if self .__cipher is not None :
81
+ result ['cipher' ] = self .__cipher
82
+ if self .__params :
83
+ result ['params' ] = self .__params
84
+ return result
85
+
86
+ @classmethod
87
+ def from_dict (cls , options_dict : Dict [str , Any ]) -> 'ChannelOptions' :
88
+ """Create ChannelOptions from dictionary"""
89
+ if not isinstance (options_dict , dict ):
90
+ raise AblyException ("options must be a dictionary" , 40000 , 400 )
91
+
92
+ return cls (
93
+ cipher = options_dict .get ('cipher' ),
94
+ params = options_dict .get ('params' ),
95
+ )
96
+
97
+
21
98
class RealtimeChannel (EventEmitter , Channel ):
22
99
"""
23
100
Ably Realtime Channel
@@ -43,23 +120,39 @@ class RealtimeChannel(EventEmitter, Channel):
43
120
Unsubscribe to messages from a channel
44
121
"""
45
122
46
- def __init__ (self , realtime : AblyRealtime , name : str ):
123
+ def __init__ (self , realtime : AblyRealtime , name : str , channel_options : Optional [ ChannelOptions ] = None ):
47
124
EventEmitter .__init__ (self )
48
125
self .__name = name
49
126
self .__realtime = realtime
50
127
self .__state = ChannelState .INITIALIZED
51
128
self .__message_emitter = EventEmitter ()
52
129
self .__state_timer : Optional [Timer ] = None
53
130
self .__attach_resume = False
131
+ self .__attach_serial : Optional [str ] = None
54
132
self .__channel_serial : Optional [str ] = None
55
133
self .__retry_timer : Optional [Timer ] = None
56
134
self .__error_reason : Optional [AblyException ] = None
135
+ self .__channel_options = channel_options or ChannelOptions ()
57
136
58
137
# Used to listen to state changes internally, if we use the public event emitter interface then internals
59
138
# will be disrupted if the user called .off() to remove all listeners
60
139
self .__internal_state_emitter = EventEmitter ()
61
140
62
- Channel .__init__ (self , realtime , name , {})
141
+ # Pass channel options as dictionary to parent Channel class
142
+ Channel .__init__ (self , realtime , name , self .__channel_options .to_dict ())
143
+
144
+ async def set_options (self , channel_options : ChannelOptions ) -> None :
145
+ """Set channel options"""
146
+ old_channel_options = self .__channel_options
147
+ self .__channel_options = channel_options
148
+ # Update parent class options
149
+ self .options = channel_options .to_dict ()
150
+
151
+ if self .should_reattach_to_set_options (old_channel_options , channel_options ):
152
+ self ._attach_impl ()
153
+ state_change = await self .__internal_state_emitter .once_async ()
154
+ if state_change .current in (ChannelState .SUSPENDED , ChannelState .FAILED ):
155
+ raise state_change .reason
63
156
64
157
# RTL4
65
158
async def attach (self ) -> None :
@@ -108,6 +201,7 @@ def _attach_impl(self):
108
201
# RTL4c
109
202
attach_msg = {
110
203
"action" : ProtocolMessageAction .ATTACH ,
204
+ "params" : self .__channel_options .params ,
111
205
"channel" : self .name ,
112
206
}
113
207
@@ -292,8 +386,6 @@ def _on_message(self, proto_msg: dict) -> None:
292
386
action = proto_msg .get ('action' )
293
387
# RTL4c1
294
388
channel_serial = proto_msg .get ('channelSerial' )
295
- if channel_serial :
296
- self .__channel_serial = channel_serial
297
389
# TM2a, TM2c, TM2f
298
390
Message .update_inner_message_fields (proto_msg )
299
391
@@ -314,6 +406,8 @@ def _on_message(self, proto_msg: dict) -> None:
314
406
if not resumed :
315
407
state_change = ChannelStateChange (self .state , ChannelState .ATTACHED , resumed , exception )
316
408
self ._emit ("update" , state_change )
409
+ self .__attach_serial = channel_serial
410
+ self .__channel_serial = channel_serial
317
411
elif self .state == ChannelState .ATTACHING :
318
412
self ._notify_state (ChannelState .ATTACHED , resumed = resumed )
319
413
else :
@@ -327,6 +421,7 @@ def _on_message(self, proto_msg: dict) -> None:
327
421
self ._request_state (ChannelState .ATTACHING )
328
422
elif action == ProtocolMessageAction .MESSAGE :
329
423
messages = Message .from_encoded_array (proto_msg .get ('messages' ))
424
+ self .__channel_serial = channel_serial
330
425
for message in messages :
331
426
self .__message_emitter ._emit (message .name , message )
332
427
elif action == ProtocolMessageAction .ERROR :
@@ -431,6 +526,11 @@ def __on_retry_timer_expire(self) -> None:
431
526
log .info ("RealtimeChannel retry timer expired, attempting a new attach" )
432
527
self ._request_state (ChannelState .ATTACHING )
433
528
529
+ def should_reattach_to_set_options (self , old_options : ChannelOptions , new_options : ChannelOptions ) -> bool :
530
+ if self .state != ChannelState .ATTACHING and self .state != ChannelState .ATTACHED :
531
+ return False
532
+ return old_options != new_options
533
+
434
534
# RTL23
435
535
@property
436
536
def name (self ) -> str :
@@ -466,19 +566,38 @@ class Channels(RestChannels):
466
566
"""
467
567
468
568
# RTS3
469
- def get (self , name : str ) -> RealtimeChannel :
569
+ def get (self , name : str , options : Optional [ Union [ dict , ChannelOptions ]] = None ) -> RealtimeChannel :
470
570
"""Creates a new RealtimeChannel object, or returns the existing channel object.
471
571
472
572
Parameters
473
573
----------
474
574
475
575
name: str
476
576
Channel name
577
+ options: ChannelOptions or dict, optional
578
+ Channel options for the channel
477
579
"""
580
+ # Convert dict to ChannelOptions if needed
581
+ if options is not None :
582
+ if isinstance (options , dict ):
583
+ options = ChannelOptions .from_dict (options )
584
+ elif not isinstance (options , ChannelOptions ):
585
+ raise AblyException ("options must be ChannelOptions instance or dictionary" , 40000 , 400 )
586
+
478
587
if name not in self .__all :
479
- channel = self .__all [name ] = RealtimeChannel (self .__ably , name )
588
+ channel = self .__all [name ] = RealtimeChannel (self .__ably , name , options )
480
589
else :
481
590
channel = self .__all [name ]
591
+ # Update options if channel is not attached or currently attaching
592
+ if channel .should_reattach_to_set_options (channel .__channel_options , options ):
593
+ raise AblyException (
594
+ 'Channels.get() cannot be used to set channel options that would cause the channel to '
595
+ 'reattach. Please, use RealtimeChannel.setOptions() instead.' ,
596
+ 400 ,
597
+ 40000
598
+ )
599
+ else :
600
+ channel .set_options (options )
482
601
return channel
483
602
484
603
# RTS4
0 commit comments