-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistributor.py
345 lines (299 loc) · 12.7 KB
/
distributor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
from typing import NewType
import codecs
import json
import threading
import time
import pydbus
from gi.repository import GLib
import requests
server_url = "http://127.0.0.1:8976"
# Type hints for robustness
ServerID = NewType('ServerID', str) # server-side ID
Token = NewType('Token', str) # client-side token
ServiceName = NewType('ServiceName', str) # d-bus service name
class UPMessage():
""" This class contains data passed around between the server communication
part and the D-Bus API.
It contains an internal ID specific to this implementation, as well as the
raw data pushed to the service.
The internal id can be matched to a UnifiedPush-specified
application-provided token.
"""
def __init__(self, id: ServerID, data: bytes):
self.id = id
self.data = data
class Server():
""" Implements the distributor-specific client-server API,
this is not specified by UnifiedPush, and you are free to use whatever
transport you want (websockets, SMS, server-sent events, etc).
For now, this uses HTTP long poll.
"""
clientPath = "/client/id/"
clientPathMultiple = "/client/multi_id/"
registerPath = "/client/register"
pusherPath = "/push/id/"
def __init__(self, url):
self.base_url = url
self.id_set: set[ServerID] = []
self.reconnect_flag = False # When the list of IDs has changed, reco
self.stop_listening_flag = False
self.response: requests.Response | None = None
def unregister(self, id: ServerID):
return True # TODO
def register(self):
try:
r = requests.get(self.base_url + self.registerPath)
j = r.json()
id = j.get("id")
return id # TODO register and return endpoint
except requests.exceptions.RequestException:
return None
def id_to_endpoint(self, id: ServerID):
return self.base_url + self.pusherPath + id
def id_is_registered(self, id: ServerID):
try:
r = requests.get(self.base_url + self.pusherPath)
j = r.json
if j.get("unifiedpush") is not None and \
j.get("unifiedpush").get("version") == 1:
return True
except requests.exceptions.RequestException:
return False # Not strictly true, but not sure what to return TODO
def listen(self, id_set: set[ServerID]):
self.update_listening(id_set)
self.__listen()
def __messages_from_line(self, line: bytes):
print(f"received line: {line}")
try:
j = json.loads(line)
except json.decoder.JSONDecodeError:
print('***Attention***')
return [] # Did not receive the entire line
messages = []
for key in j:
if not isinstance(key, str):
print(f"key {key} is not a str, ignoring")
continue
id = ServerID(key)
for b64message in j[key]:
data = codecs.decode(b64message.encode('ascii'), 'base64')
message = UPMessage(id, data)
messages.append(message)
return messages
def __listen(self):
self.stop_listening_flag = False
while self.stop_listening_flag is False:
if len(self.id_set) == 0:
time.sleep(1)
continue
self.reconnect_flag = False
r = requests.get(self.make_listen_multiple_URL(), stream=True)
self.response = r
# TODO: check that it returns 2xx, if all ids valid
while self.reconnect_flag is False and \
self.stop_listening_flag is False:
line = r.raw.readline()
# time.sleep(0.1)
if line: # TODO: check that this catches keepalives
messages = self.__messages_from_line(line)
self.dbus.send_messages(messages)
def stop_listening(self):
self.stop_listening_flag = True
if self.response is not None:
self.response.close()
def update_listening(self, new_set: set[ServerID]):
if new_set != self.id_set:
self.id_set = new_set
self.reconnect()
def reconnect(self):
""" Reconnect to the push server, only has an effect if listening """
self.reconnect_flag = True
if self.response is not None:
self.response.close()
def make_listen_multiple_URL(self):
url = self.base_url + self.clientPathMultiple
for item in self.id_set:
url += item+"&"
return url[:-1]
class DBus():
""" This class implements the UnifiedPush D-Bus distributor API.
The specified interface is mostly in the Distributor subclass,
and this class implements helper functions to interface with the server
and database.
"""
class PyUPush(): # Dummy D-Bus interface
"""
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<!--
SPDX-FileCopyrightText: 2022 Mayeul Cantan <oss+up@mayeul.net>
SPDX-License-Identifier: CC0-1.0
-->
<node>
<interface name="org.unifiedPush.Distributor.PyUPush.hello">
<method name="Say_hello">
<arg name="answer" type="s" direction="out"/>
</method>
</interface>
</node>
"""
def Say_hello(self):
return "hello"
class Distributor():
"""
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<!--
SPDX-FileCopyrightText: 2022 Volker Krause <vkrause@kde.org>
SPDX-License-Identifier: CC0-1.0
-->
<node>
<interface name="org.unifiedpush.Distributor1">
<method name="Register">
<arg name="serviceName" type="s" direction="in"/>
<arg name="token" type="s" direction="in"/>
<arg name="description" type="s" direction="in"/>
<arg name="registrationResult" type="s" direction="out"/>
<arg name="registrationResultReason" type="s" direction="out"/>
</method>
<method name="Unregister">
<arg name="token" type="s" direction="in"/>
<annotation name="org.freedesktop.DBus.Method.NoReply" value="true"/>
</method>
</interface>
</node>
"""
def __init__(self, dbus: 'DBus'):
self.dbus = dbus # TODO: get rid of this, that's not very clean
def Register(self, serviceName: str, token: str, description: str):
"""UnifiedPush-specified register D-Bus method.
Calls into the implementation-specific server registration code,
and returns a success/failure message.
If successful, call the application-provided D-Bus serviceName
to give it the UnifiedPush endpoint
"""
# Silently drop serviceName and description after printing them
print(f"New registration from {serviceName} \
with token:{token} -- {description}")
failure = "REGISTRATION_FAILED"
success = "REGISTRATION_SUCCEEDED"
tk = Token(token)
sn = ServiceName(serviceName)
if registrationDB.token_exists(tk): # Already registered
if server.id_is_registered(registrationDB.get_id(tk)):
return (success, "already registered")
new_id = server.register()
if new_id is None:
return (failure, "error during registration")
status = registrationDB.insert(tk, new_id, sn, description)
if not status:
return (failure, "failed to save info")
self.dbus.send_new_endpoint(sn, tk, server.id_to_endpoint(new_id)) # TODO can MAYBE cause deadlocks?
server.update_listening(registrationDB.id_set())
return (success, "successfully registered")
def Unregister(self, token: str):
""" UnifiedPush-specified unregister D-Bus method. Returns nothing,
but subsequently calls into the application-provided serviceName
to tell it the result of the operation.
"""
tk = Token(token)
unregistered = True
# If we didn't know the token, assume unregistration successful
if registrationDB.token_exists(tk):
unregistered = server.unregister(registrationDB.get_id(tk))
if unregistered:
registrationDB.remove(tk)
server.update_listening(registrationDB.id_set())
self.dbus.tell_unregistered(registrationDB.get_serviceName(tk), # TODO SPEC issue: if not registered, how can I know the service name?
tk, unregistered)
return
def __init__(self):
session_bus = pydbus.SessionBus()
# system_bus = pydbus.SystemBus() # TODO: try to register on system bus
server.dbus = self # FIXME that's a HACK
self.bus = session_bus
self.loop = GLib.MainLoop()
self.bus.publish("org.unifiedpush.Distributor.PyUPush", self.PyUPush(),
("/org/unifiedpush/Distributor", self.Distributor(self)))
self.loop.run()
def stop(self):
self.loop.quit()
def get_connector(self, sn: ServiceName):
return self.bus.get(sn, "/org/unifiedpush/Connector")
def send_messages(self, messages: list[UPMessage]):
for message in messages:
self.send_message(message)
def send_message(self, message: UPMessage):
tk = registrationDB.get_token(message.id)
if tk is None:
return # was probably removed earlier during the sending sequence
sn = registrationDB.get_serviceName(tk)
try:
con = self.get_connector(sn)
except GLib.GError: # TODO: better error handling, this is g-dbus-error-quark: GDBus.Error:org.freedesktop.DBus.Error.ServiceUnknown
print(f"could not find the receiver, unregistering token for id {message.id}")
server.unregister(message.id)
registrationDB.remove(tk)
return
print(f"{tk} -- sending message '{message.data}' to {sn}")
con.Message(tk, message.data, "")
def send_new_endpoint(self, sn: ServiceName, token: Token, endpoint: str):
con = self.get_connector(sn)
con.NewEndpoint(token, endpoint)
def tell_unregistered(self, sn: ServiceName, token: Token, status: bool):
con = self.get_connector(sn)
if status is True:
token = Token("")
con.Unregistered(token)
class RegistrationDB():
""" This class is just an abstract layer above the dictionary
that serves to map D-Bus UnifiedPush tokens to the
target D-Bus service Name, and the internal Client-Server token
"""
class UPclient():
""" Helper structure that stores info about a client that has asked
us to register a UnifiedPush endpoint """
def __init__(self, token: Token, id: ServerID,
serviceName: ServiceName, description: str):
self.id = id
self.serviceName = serviceName
self.description = description
self.token = token
def __init__(self):
self.db: dict[Token, self.UPclient] = {}
def insert(self, token: Token, id: ServerID,
serviceName: ServiceName, description: str):
if token in self.db:
return False
client = self.UPclient(token, id, serviceName, description)
self.db[token] = client
return True
def id_set(self):
return set(client.id for client in self.db.values())
def token_list(self):
return list(tk for tk in self.db)
def token_exists(self, tk: Token):
return tk in self.db
def get_id(self, tk: Token):
client = self.db.get(tk)
if client is not None:
return client.id
return None
def get_token(self, id: ServerID):
for tk in self.db:
if self.db[tk].id == id:
return tk
return None
def remove(self, tk: Token):
if tk not in self.db:
return
del self.db[tk]
def get_serviceName(self, tk: Token):
if tk not in self.db:
return None
return self.db[tk].serviceName
registrationDB = RegistrationDB()
server = Server(server_url)
id_set = registrationDB.id_set()
listen_thread = threading.Thread(target=server.listen, args=(id_set,))
listen_thread.start()
bus = DBus()
listen_thread.join()