Skip to content

Commit 57939c6

Browse files
authored
Merge pull request Pithikos#94 from Pithikos/feature/threaded-server
Feature/threaded server
2 parents 31da79f + afe2c11 commit 57939c6

File tree

9 files changed

+196
-111
lines changed

9 files changed

+196
-111
lines changed

.circleci/config.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,10 @@ jobs:
99
- image: circleci/python:3
1010
steps:
1111
- checkout
12-
- run:
13-
name: install dependencies
14-
command: |
15-
sudo pip install tox
1612
- run:
1713
name: run tests
1814
command: |
19-
tox -e py37
15+
pytest
2016
- store_artifacts:
2117
path: test-reports
2218
destination: test-reports

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ The WebsocketServer can be initialized with the below parameters.
7373

7474
| Method | Description | Takes | Gives |
7575
|-----------------------------|---------------------------------------------------------------------------------------|-----------------|-------|
76+
| `run_forever()` | Runs server until shutdown_gracefully or shutdown_abruptly are called. | threaded: run server on its own thread if True | None |
7677
| `set_fn_new_client()` | Sets a callback function that will be called for every new `client` connecting to us | function | None |
7778
| `set_fn_client_left()` | Sets a callback function that will be called for every `client` disconnecting from us | function | None |
7879
| `set_fn_message_received()` | Sets a callback function that will be called when a `client` sends a message | function | None |

docs/release-workflow.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ from development comes with a tag.
66

77
General flow
88

9-
1. Update VERSION in setup.py from development branch and commit
10-
2. Update releases.txt
11-
3. Merge development into master (`git merge --no-ff development`)
9+
1. Get in dev branch
10+
2. Update VERSION in setup.py and releases.txt file
11+
3. Make a commit
12+
4. Merge development into master (`git merge --no-ff development`)
1213
4. Add corresponding version as a new tag (`git tag <new_version>`) e.g. git tag v0.3.0
1314
5. Push everything (`git push --tags && git push`)

releases.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
0.5.1
55
- SSL support
66
- Drop Python 2 support
7+
8+
0.5.4
9+
- Add API for shutting down server (abruptly & gracefully)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from distutils.command.install import install
1313

1414

15-
VERSION = '0.5.1'
15+
VERSION = '0.5.4'
1616

1717

1818
def get_tag_version():

tests/test_server.py

Lines changed: 109 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,115 @@
1-
from utils import session, client_session, server
1+
from utils import session, client_session, threaded_server
22
from time import sleep
3+
import threading
4+
5+
from websocket_server import WebsocketServer
36

47
import websocket
58
import pytest
69

710

8-
def test_send_close(client_session):
9-
"""
10-
Ensure client stops receiving data once we send_close (socket is still open)
11-
"""
12-
client, server = client_session
13-
assert client.received_messages == []
14-
15-
server.send_message_to_all("test1")
16-
sleep(0.5)
17-
assert client.received_messages == ["test1"]
18-
19-
# After CLOSE, client should not be receiving any messages
20-
server.clients[-1]["handler"].send_close()
21-
sleep(0.5)
22-
server.send_message_to_all("test2")
23-
sleep(0.5)
24-
assert client.received_messages == ["test1"]
25-
26-
27-
def test_shutdown_gracefully(client_session):
28-
client, server = client_session
29-
assert client.ws.sock and client.ws.sock.connected
30-
assert server.socket.fileno() > 0
31-
32-
server.shutdown_gracefully()
33-
sleep(0.5)
34-
35-
# Ensure all parties disconnected
36-
assert not client.ws.sock
37-
assert server.socket.fileno() == -1
38-
assert not server.clients
39-
40-
41-
def test_shutdown_abruptly(client_session):
42-
client, server = client_session
43-
assert client.ws.sock and client.ws.sock.connected
44-
assert server.socket.fileno() > 0
45-
46-
server.shutdown_abruptly()
47-
sleep(0.5)
48-
49-
# Ensure server socket died
50-
assert server.socket.fileno() == -1
51-
52-
# Ensure client handler terminated
53-
assert server.received_messages == []
54-
assert client.errors == []
55-
client.ws.send("1st msg after server shutdown")
56-
sleep(0.5)
57-
58-
# Note the message is received since the client handler
59-
# will terminate only once it has received the last message
60-
# and break out of the keep_alive loop. Any consecutive messages
61-
# will not be received though.
62-
assert server.received_messages == ["1st msg after server shutdown"]
63-
assert len(client.errors) == 1
64-
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)
65-
66-
# Try to send 2nd message
67-
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
68-
client.ws.send("2nd msg after server shutdown")
69-
70-
71-
def test_client_closes_gracefully(session):
72-
client, server = session
73-
assert client.connected
74-
assert server.clients
75-
old_client_handler = server.clients[0]["handler"]
76-
client.close()
77-
assert not client.connected
78-
79-
# Ensure server closed connection.
80-
# We test this by having the server trying to send
81-
# data to the client
82-
assert not server.clients
83-
with pytest.raises(BrokenPipeError):
84-
old_client_handler.connection.send(b"test")
11+
class TestServerThreadedWithoutClient():
12+
def test_run_forever(self, threaded_server):
13+
assert threaded_server.thread
14+
assert not isinstance(threaded_server.thread, threading._MainThread)
15+
assert threaded_server.thread.is_alive()
16+
17+
def test_shutdown(self, threaded_server):
18+
assert threaded_server.thread.is_alive()
19+
20+
# Shutdown de-facto way
21+
# REF: https://docs.python.org/3/library/socketserver.html
22+
# "Tell the serve_forever() loop to stop and
23+
# wait until it does. shutdown() must be called while serve_forever()
24+
# is running in a different thread otherwise it will deadlock."
25+
threaded_server.shutdown()
26+
assert not threaded_server.thread.is_alive()
27+
28+
def test_shutdown_gracefully_without_clients(self, threaded_server):
29+
assert threaded_server.thread.is_alive()
30+
threaded_server.shutdown_gracefully()
31+
assert not threaded_server.thread.is_alive()
32+
assert threaded_server.socket.fileno() <= 0
33+
34+
def test_shutdown_abruptly_without_clients(self, threaded_server):
35+
assert threaded_server.thread.is_alive()
36+
threaded_server.shutdown_abruptly()
37+
assert not threaded_server.thread.is_alive()
38+
assert threaded_server.socket.fileno() <= 0
39+
40+
41+
class TestServerThreadedWithClient():
42+
def test_send_close(self, client_session):
43+
"""
44+
Ensure client stops receiving data once we send_close (socket is still open)
45+
"""
46+
client, server = client_session
47+
assert client.received_messages == []
48+
49+
server.send_message_to_all("test1")
50+
sleep(0.5)
51+
assert client.received_messages == ["test1"]
52+
53+
# After CLOSE, client should not be receiving any messages
54+
server.clients[-1]["handler"].send_close()
55+
sleep(0.5)
56+
server.send_message_to_all("test2")
57+
sleep(0.5)
58+
assert client.received_messages == ["test1"]
59+
60+
def test_shutdown_gracefully(self, client_session):
61+
client, server = client_session
62+
assert client.ws.sock and client.ws.sock.connected
63+
assert server.socket.fileno() > 0
64+
65+
server.shutdown_gracefully()
66+
sleep(0.5)
67+
68+
# Ensure all parties disconnected
69+
assert not client.ws.sock
70+
assert server.socket.fileno() == -1
71+
assert not server.clients
72+
73+
def test_shutdown_abruptly(self, client_session):
74+
client, server = client_session
75+
assert client.ws.sock and client.ws.sock.connected
76+
assert server.socket.fileno() > 0
77+
78+
server.shutdown_abruptly()
79+
sleep(0.5)
80+
81+
# Ensure server socket died
82+
assert server.socket.fileno() == -1
83+
84+
# Ensure client handler terminated
85+
assert server.received_messages == []
86+
assert client.errors == []
87+
client.ws.send("1st msg after server shutdown")
88+
sleep(0.5)
89+
90+
# Note the message is received since the client handler
91+
# will terminate only once it has received the last message
92+
# and break out of the keep_alive loop. Any consecutive messages
93+
# will not be received though.
94+
assert server.received_messages == ["1st msg after server shutdown"]
95+
assert len(client.errors) == 1
96+
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)
97+
98+
# Try to send 2nd message
99+
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
100+
client.ws.send("2nd msg after server shutdown")
101+
102+
def test_client_closes_gracefully(self, session):
103+
client, server = session
104+
assert client.connected
105+
assert server.clients
106+
old_client_handler = server.clients[0]["handler"]
107+
client.close()
108+
assert not client.connected
109+
110+
# Ensure server closed connection.
111+
# We test this by having the server trying to send
112+
# data to the client
113+
assert not server.clients
114+
with pytest.raises(BrokenPipeError):
115+
old_client_handler.connection.send(b"test")

tests/utils.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,33 +57,31 @@ def handle_received_message(self, client, server, message):
5757

5858

5959
@pytest.fixture(scope='function')
60-
def server():
60+
def threaded_server():
6161
""" Returns the response of a server after"""
62-
s = TestServer(0, loglevel=logging.DEBUG)
63-
server_thread = Thread(target=s.run_forever)
64-
server_thread.daemon = True
65-
server_thread.start()
66-
yield s
67-
s.server_close()
62+
server = TestServer(0, loglevel=logging.DEBUG)
63+
server.run_forever(threaded=True)
64+
yield server
65+
server.server_close()
6866

6967

7068
@pytest.fixture
71-
def session(server):
69+
def session(threaded_server):
7270
"""
7371
Gives a simple connection to a server
7472
"""
75-
conn = websocket.create_connection("ws://{}:{}".format(*server.server_address))
76-
yield conn, server
73+
conn = websocket.create_connection("ws://{}:{}".format(*threaded_server.server_address))
74+
yield conn, threaded_server
7775
conn.close()
7876

7977

8078
@pytest.fixture
81-
def client_session(server):
79+
def client_session(threaded_server):
8280
"""
8381
Gives a TestClient instance connected to a server
8482
"""
85-
client = TestClient(port=server.port)
83+
client = TestClient(port=threaded_server.port)
8684
sleep(1)
8785
assert client.ws.sock and client.ws.sock.connected
88-
yield client, server
86+
yield client, threaded_server
8987
client.ws.close()

websocket_server/thread.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import threading
2+
3+
4+
class ThreadWithLoggedException(threading.Thread):
5+
"""
6+
Similar to Thread but will log exceptions to passed logger.
7+
8+
Args:
9+
logger: Logger instance used to log any exception in child thread
10+
11+
Exception is also reachable via <thread>.exception from the main thread.
12+
"""
13+
14+
DIVIDER = "*"*80
15+
16+
def __init__(self, *args, **kwargs):
17+
try:
18+
self.logger = kwargs.pop("logger")
19+
except KeyError:
20+
raise Exception("Missing 'logger' in kwargs")
21+
super().__init__(*args, **kwargs)
22+
self.exception = None
23+
24+
def run(self):
25+
try:
26+
if self._target is not None:
27+
self._target(*self._args, **self._kwargs)
28+
except Exception as exception:
29+
thread = threading.current_thread()
30+
self.exception = exception
31+
self.logger.exception(f"{self.DIVIDER}\nException in child thread {thread}: {exception}\n{self.DIVIDER}")
32+
finally:
33+
del self._target, self._args, self._kwargs
34+
35+
36+
class WebsocketServerThread(ThreadWithLoggedException):
37+
"""Dummy wrapper to make debug messages a bit more readable"""
38+
pass

0 commit comments

Comments
 (0)