-
Notifications
You must be signed in to change notification settings - Fork 14
/
example.py
204 lines (156 loc) · 6.6 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import asyncio
import logging
import signal
from centrifuge import (
CentrifugeError,
Client,
ClientEventHandler,
ConnectedContext,
ConnectingContext,
DisconnectedContext,
ErrorContext,
JoinContext,
LeaveContext,
PublicationContext,
SubscribedContext,
SubscribingContext,
SubscriptionErrorContext,
UnsubscribedContext,
SubscriptionEventHandler,
ServerSubscribedContext,
ServerSubscribingContext,
ServerUnsubscribedContext,
ServerPublicationContext,
ServerJoinContext,
ServerLeaveContext,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Configure centrifuge-python logger.
cf_logger = logging.getLogger("centrifuge")
cf_logger.setLevel(logging.DEBUG)
async def get_client_token() -> str:
# To reject connection raise centrifuge.UnauthorizedError() exception:
# raise centrifuge.UnauthorizedError()
logging.info("get client token called")
# REPLACE with your own logic to get token from the backend!
example_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI0MiIsImV4cCI6Nzc1NDI2MTQwOSwiaWF0Ij"
"oxNzA2MjYxNDA5fQ.9jQEr9XqAW1BY9oolmawhtLRx1ZLJZS6ivgYznuf4-Y"
)
return example_token
async def get_subscription_token(channel: str) -> str:
# To reject subscription raise centrifuge.UnauthorizedError() exception:
# raise centrifuge.UnauthorizedError()
logging.info("get subscription token called for channel %s", channel)
# REPLACE with your own logic to get token from the backend!
example_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI0MiIsImV4cCI6Nzc1NDQ0MzA1NywiaWF0IjoxNz"
"A2NDQzMDU3LCJjaGFubmVsIjoiZXhhbXBsZTpjaGFubmVsIn0._rcyM78ol1MgCqngA4Vyt8P3o1SnDX_hSXhEA"
"_xByKU"
)
return example_token
class ClientEventLoggerHandler(ClientEventHandler):
"""Check out comments of ClientEventHandler methods to see when they are called."""
async def on_connecting(self, ctx: ConnectingContext) -> None:
logging.info("connecting: %s", ctx)
async def on_connected(self, ctx: ConnectedContext) -> None:
logging.info("connected: %s", ctx)
async def on_disconnected(self, ctx: DisconnectedContext) -> None:
logging.info("disconnected: %s", ctx)
async def on_error(self, ctx: ErrorContext) -> None:
logging.error("client error: %s", ctx)
async def on_subscribed(self, ctx: ServerSubscribedContext) -> None:
logging.info("subscribed server-side sub: %s", ctx)
async def on_subscribing(self, ctx: ServerSubscribingContext) -> None:
logging.info("subscribing server-side sub: %s", ctx)
async def on_unsubscribed(self, ctx: ServerUnsubscribedContext) -> None:
logging.info("unsubscribed from server-side sub: %s", ctx)
async def on_publication(self, ctx: ServerPublicationContext) -> None:
logging.info("publication from server-side sub: %s", ctx.pub.data)
async def on_join(self, ctx: ServerJoinContext) -> None:
logging.info("join in server-side sub: %s", ctx)
async def on_leave(self, ctx: ServerLeaveContext) -> None:
logging.info("leave in server-side sub: %s", ctx)
class SubscriptionEventLoggerHandler(SubscriptionEventHandler):
"""Check out comments of SubscriptionEventHandler methods to see when they are called."""
async def on_subscribing(self, ctx: SubscribingContext) -> None:
logging.info("subscribing: %s", ctx)
async def on_subscribed(self, ctx: SubscribedContext) -> None:
logging.info("subscribed: %s", ctx)
async def on_unsubscribed(self, ctx: UnsubscribedContext) -> None:
logging.info("unsubscribed: %s", ctx)
async def on_publication(self, ctx: PublicationContext) -> None:
logging.info("publication: %s", ctx.pub.data)
async def on_join(self, ctx: JoinContext) -> None:
logging.info("join: %s", ctx)
async def on_leave(self, ctx: LeaveContext) -> None:
logging.info("leave: %s", ctx)
async def on_error(self, ctx: SubscriptionErrorContext) -> None:
logging.error("subscription error: %s", ctx)
def run_example():
client = Client(
"ws://localhost:8000/connection/websocket",
events=ClientEventLoggerHandler(),
get_token=get_client_token,
use_protobuf=False,
)
sub = client.new_subscription(
"example:channel",
events=SubscriptionEventLoggerHandler(),
get_token=get_subscription_token,
# you can pass `delta=centrifuge.DeltaType.FOSSIL` (should be also enabled on server)
# and other options here.
)
async def run():
await client.connect()
await sub.subscribe()
try:
# Note that in Protobuf case we need to encode payloads to bytes:
# result = await sub.publish(data=json.dumps({"input": "test"}).encode())
# But in JSON protocol case we can just pass dict which will be encoded to
# JSON automatically.
await sub.publish(data={"input": "test"})
except CentrifugeError as e:
logging.error("error publish: %s", e)
try:
result = await sub.presence_stats()
logging.info(result)
except CentrifugeError as e:
logging.error("error presence stats: %s", e)
try:
result = await sub.presence()
logging.info(result)
except CentrifugeError as e:
logging.error("error presence: %s", e)
try:
result = await sub.history(limit=1, reverse=True)
logging.info(result)
except CentrifugeError as e:
logging.error("error history: %s", e)
logging.info("all done, client connection is still alive, press Ctrl+C to exit")
asyncio.ensure_future(run())
loop = asyncio.get_event_loop()
async def shutdown(received_signal):
logging.info("received exit signal %s...", received_signal.name)
await client.disconnect()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
logging.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda received_signal=s: asyncio.create_task(shutdown(received_signal))
)
try:
loop.run_forever()
finally:
loop.close()
logging.info("successfully completed service shutdown")
if __name__ == "__main__":
run_example()