Skip to content

Commit 4ecbc82

Browse files
authored
feat: replace asyncio websocket with a more easy to use threading one (#87)
1 parent d7063b2 commit 4ecbc82

File tree

9 files changed

+138
-83
lines changed

9 files changed

+138
-83
lines changed

Pipfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ name = "pypi"
66
[packages]
77
types-requests = "*"
88
requests = "*"
9-
websockets = "*"
9+
"websocket-client" = "*"
1010
zeroconf = "*"
1111
types-dataclasses = "*"
1212
dataclasses = "*"

Pipfile.lock

Lines changed: 6 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

evasdk/Eva.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .helpers import (strip_ip)
44
from .eva_http_client import EvaHTTPClient
55
from .eva_locker import EvaWithLocker
6+
from .eva_ws import Websocket
67

78

89
class Eva:
@@ -29,6 +30,15 @@ def set_request_timeout(self, request_timeout):
2930
__TEACH_RENEW_PERIOD = 3
3031

3132

33+
# --------------------------------------------- Websocket -----------------------------------------------
34+
def websocket(self):
35+
# Ensure we have a session token
36+
# might use the result in the future to give the initial state to consumers
37+
self.__http_client.data_snapshot()
38+
host_uri = f'ws://{self.__http_client.host_ip}/api/v1/data/stream'
39+
subprotocols = [f'SessionToken_{self.__http_client.session_token}', "object"]
40+
return Websocket(host_uri, subprotocols, timeout=self.__http_client.request_timeout)
41+
3242
# --------------------------------------------- Lock Holder ---------------------------------------------
3343
def __enter__(self):
3444
self.__eva_locker.__enter__()

evasdk/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,12 @@
6969

7070
from .Eva import Eva
7171
from .eva_http_client import EvaHTTPClient
72-
from .eva_ws import ws_connect
7372
from .robot_state import RobotState
7473
from .helpers import strip_ip
7574
from .eva_errors import (
7675
EvaError,
7776
EvaValidationError, EvaAuthError, EvaAutoRenewError,
78-
EvaAdminError, EvaServerError)
77+
EvaAdminError, EvaServerError, EvaWebsocketError)
7978
from .version import __version__
8079
from .EvaDiscoverer import (
8180
DiscoverCallback, DiscoveredEva,

evasdk/eva_errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class EvaAutoRenewError(EvaError, Exception):
3030
"""Error thrown when automatic session renewal fails but not the original request"""
3131

3232

33+
class EvaWebsocketError(EvaError, Exception):
34+
"""Error thrown when an issue related to the websocket stream happens"""
35+
36+
3337
def eva_error(label, r=None):
3438
if r is not None:
3539
__handle_http_error(label, r)

evasdk/eva_ws.py

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,89 @@
1-
import websockets
1+
import websocket # type: ignore
2+
import logging
3+
import json
4+
from threading import Thread, Condition
5+
from .eva_errors import EvaWebsocketError
6+
from .observer import Subject
27

38

4-
async def ws_connect(host_ip, session_token):
5-
"""
6-
Connect is an async function that returns a connected Eva websocket
9+
logger = logging.getLogger(__name__)
10+
711

8-
Connect needs to be run from a asyncio event_loop and retuns a
9-
websockets.Websocket object. Using this you can manually call .recv()
10-
and .send(), make sure to also .close() when you are finished to clean
11-
up the websocket connection.
12+
class Websocket:
1213
"""
13-
host_uri = 'ws://{}/api/v1/data/stream'.format(host_ip)
14-
subprotocols = ['SessionToken_{}'.format(session_token), "object"]
14+
This class creates a context which runs a thread to monitor a websocket in the background.
15+
It follows the Observer pattern, which you can use to listen to specific event or `all` to get all of them.
16+
Note: notifications will be sent from a different thread so you will need to use a mutex or other synchronization mechanims.
17+
"""
18+
19+
def __init__(self, url, protocols, timeout=5):
20+
self.__thread = Thread(target=self.__run)
21+
self.__url = url
22+
self.__protocols = protocols
23+
self.__app = None
24+
self.__ws = None
25+
self.__cond = Condition()
26+
self.__timeout = timeout
27+
self.__subject = Subject()
28+
29+
def __enter__(self):
30+
self.__thread.start()
31+
with self.__cond:
32+
connected = self.__cond.wait(timeout=self.__timeout)
33+
if not connected:
34+
raise EvaWebsocketError('could not connect to Eva\'s data stream')
35+
return self
36+
37+
def __on_open(self, ws):
38+
logger.debug('ws_on_open')
39+
with self.__cond:
40+
self.__ws = ws
41+
self.__cond.notify_all()
42+
43+
def __on_message(self, ws, raw):
44+
msg = json.loads(raw)
45+
if 'type' not in msg or msg['type'] != 'state_change':
46+
logger.debug('ws_on_message', msg)
47+
self.__subject.notify('all', msg)
48+
if 'type' in msg:
49+
self.__subject.notify(msg['type'], msg)
50+
else:
51+
logger.debug('missing message type', msg)
52+
53+
def __on_error(self, ws, error):
54+
logger.debug('ws_on_error', error)
55+
with self.__cond:
56+
self.__ws = None
57+
58+
def __on_close(self, ws, close_status_code, close_msg):
59+
logger.debug('ws_on_close', close_status_code, close_msg)
60+
with self.__cond:
61+
self.__ws = None
62+
63+
def __exit__(self, type, value, traceback):
64+
if self.__app is not None:
65+
self.__app.keep_running = False
66+
self.__thread.join()
67+
68+
def __run(self):
69+
self.__app = websocket.WebSocketApp(
70+
self.__url,
71+
subprotocols=self.__protocols,
72+
on_open=self.__on_open,
73+
on_message=self.__on_message,
74+
on_error=self.__on_error,
75+
on_close=self.__on_close,
76+
)
77+
self.__app.run_forever()
78+
79+
def _send_raw(self, data):
80+
with self.__cond:
81+
if self.__ws is None:
82+
raise EvaWebsocketError('Eva\'s data stream has disconnected')
83+
self.__ws.send(json.dumps(data))
84+
85+
def register(self, event, callback):
86+
self.__subject.register(event, callback)
1587

16-
ws = await websockets.client.connect(host_uri, subprotocols=subprotocols)
17-
return ws
88+
def deregister(self, event, callback):
89+
self.__subject.deregister(event, callback)

evasdk/observer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class Subject:
2+
def __init__(self):
3+
self.events = {}
4+
5+
def register(self, event, callback):
6+
if event not in self.events:
7+
self.events[event] = set()
8+
self.events[event].add(callback)
9+
10+
def deregister(self, event, callback):
11+
if event not in self.events:
12+
return
13+
if callback not in self.events[event]:
14+
return
15+
self.events[event].remove(callback)
16+
17+
def notify(self, event, *args):
18+
if event not in self.events:
19+
return
20+
for callback in self.events[event]:
21+
callback(*args)

examples/http_ws_example.py

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
#!/usr/bin/env python3
22

3-
import evasdk
4-
import json
5-
import asyncio
6-
import time
3+
from evasdk import Eva
74

85
# This example shows usage of the eva_ws and eva_http modules, used for direct control
96
# using the network interfaces. eva_http also contains some helper functions not
@@ -12,33 +9,16 @@
129
host_ip = input("Please enter a Eva IP: ")
1310
token = input("Please enter a valid Eva token: ")
1411

15-
print('ip: [{}], token: [{}]\n'.format(host_ip, token))
12+
print(f'ip: [{host_ip}], token: [{token}]\n')
1613

17-
http_client = evasdk.EvaHTTPClient(host_ip, token)
14+
eva = Eva(host_ip, token)
1815

19-
# The session token will be valid for 30 minutes, you'll need to renew the session
20-
# if you want the websocket connection to continue after that point.
21-
session_token = http_client.auth_create_session()
16+
users = eva.users_get()
17+
print(f'Eva at {host_ip} users: {users}\n')
2218

23-
users = http_client.users_get()
24-
print('Eva at {} users: {}\n'.format(host_ip, users))
19+
joint_angles = eva.data_servo_positions()
20+
print(f'Eva current joint angles: {joint_angles}')
2521

26-
joint_angles = http_client.data_servo_positions()
27-
print('Eva current joint angles: {}'.format(joint_angles))
28-
29-
30-
async def eva_ws_example(host_ip, session_token):
31-
websocket = await evasdk.ws_connect(host_ip, session_token)
32-
33-
msg_count = 0
34-
time_since_msg = time.time()
35-
while True:
36-
ws_msg_json = await websocket.recv()
37-
print('WS msg delta T: {}, number: {}'.format(time.time() - time_since_msg, msg_count))
38-
msg_count += 1
39-
time_since_msg = time.time()
40-
41-
ws_msg = json.loads(ws_msg_json)
42-
print(ws_msg)
43-
44-
asyncio.get_event_loop().run_until_complete(eva_ws_example(host_ip, session_token))
22+
with eva.websocket() as ws:
23+
ws.register('state_change', print)
24+
input('press return when you want to stop\n')

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
long_description_content_type="text/markdown",
1919
install_requires=[
2020
'requests',
21-
'websockets',
21+
'websocket-client',
2222
'zeroconf',
2323
'dataclasses',
2424
# TODO: too big, install it manually if you want it

0 commit comments

Comments
 (0)