Skip to content

Commit 31da79f

Browse files
authored
Merge pull request Pithikos#92 from Pithikos/feature/closing-conn
Feature/ Shutting down server
2 parents 3dd0e19 + 65247bd commit 31da79f

File tree

6 files changed

+202
-9
lines changed

6 files changed

+202
-9
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ The WebsocketServer can be initialized with the below parameters.
5959

6060
*`key`* - If using SSL, this is the path to the key.
6161

62-
*`cert`* - If using SSL, this is the path to the certificate.
62+
*`cert`* - If using SSL, this is the path to the certificate.
6363

6464

6565
### Properties
@@ -78,6 +78,9 @@ The WebsocketServer can be initialized with the below parameters.
7878
| `set_fn_message_received()` | Sets a callback function that will be called when a `client` sends a message | function | None |
7979
| `send_message()` | Sends a `message` to a specific `client`. The message is a simple string. | client, message | None |
8080
| `send_message_to_all()` | Sends a `message` to **all** connected clients. The message is a simple string. | message | None |
81+
| `shutdown_gracefully()` | Shutdown server by sending a websocket CLOSE handshake to all connected clients. | None | None |
82+
| `shutdown_abruptly()` | Shutdown server without sending any websocket CLOSE handshake. | None | None |
83+
8184

8285

8386
### Callback functions
File renamed without changes.

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
tox>=3.24.0
33
IPython
44
pytest
5-
websocket-client
5+
websocket-client>=1.1.1
66
twine

tests/test_server.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,73 @@
1-
from utils import session, server
1+
from utils import session, client_session, server
2+
from time import sleep
23

4+
import websocket
35
import pytest
46

57

8+
def test_send_close(client_session):
9+
"""
10+
Ensure client stops receiving data once we send_close (socket is still open)
11+
"""
12+
client, server = client_session
13+
assert client.received_messages == []
14+
15+
server.send_message_to_all("test1")
16+
sleep(0.5)
17+
assert client.received_messages == ["test1"]
18+
19+
# After CLOSE, client should not be receiving any messages
20+
server.clients[-1]["handler"].send_close()
21+
sleep(0.5)
22+
server.send_message_to_all("test2")
23+
sleep(0.5)
24+
assert client.received_messages == ["test1"]
25+
26+
27+
def test_shutdown_gracefully(client_session):
28+
client, server = client_session
29+
assert client.ws.sock and client.ws.sock.connected
30+
assert server.socket.fileno() > 0
31+
32+
server.shutdown_gracefully()
33+
sleep(0.5)
34+
35+
# Ensure all parties disconnected
36+
assert not client.ws.sock
37+
assert server.socket.fileno() == -1
38+
assert not server.clients
39+
40+
41+
def test_shutdown_abruptly(client_session):
42+
client, server = client_session
43+
assert client.ws.sock and client.ws.sock.connected
44+
assert server.socket.fileno() > 0
45+
46+
server.shutdown_abruptly()
47+
sleep(0.5)
48+
49+
# Ensure server socket died
50+
assert server.socket.fileno() == -1
51+
52+
# Ensure client handler terminated
53+
assert server.received_messages == []
54+
assert client.errors == []
55+
client.ws.send("1st msg after server shutdown")
56+
sleep(0.5)
57+
58+
# Note the message is received since the client handler
59+
# will terminate only once it has received the last message
60+
# and break out of the keep_alive loop. Any consecutive messages
61+
# will not be received though.
62+
assert server.received_messages == ["1st msg after server shutdown"]
63+
assert len(client.errors) == 1
64+
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)
65+
66+
# Try to send 2nd message
67+
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
68+
client.ws.send("2nd msg after server shutdown")
69+
70+
671
def test_client_closes_gracefully(session):
772
client, server = session
873
assert client.connected
@@ -11,7 +76,9 @@ def test_client_closes_gracefully(session):
1176
client.close()
1277
assert not client.connected
1378

14-
# Ensure server closed connection
79+
# Ensure server closed connection.
80+
# We test this by having the server trying to send
81+
# data to the client
1582
assert not server.clients
1683
with pytest.raises(BrokenPipeError):
1784
old_client_handler.connection.send(b"test")

tests/utils.py

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,65 @@
11
import logging
2+
from time import sleep
23
from threading import Thread
34

45
import pytest
5-
from websocket import create_connection # websocket-client
6+
import websocket # websocket-client
67

78
import _bootstrap_
89
from websocket_server import WebsocketServer
910

1011

12+
class TestClient():
13+
def __init__(self, port, threaded=True):
14+
self.received_messages = []
15+
self.closes = []
16+
self.opens = []
17+
self.errors = []
18+
19+
websocket.enableTrace(True)
20+
self.ws = websocket.WebSocketApp(f"ws://localhost:{port}/",
21+
on_open=self.on_open,
22+
on_message=self.on_message,
23+
on_error=self.on_error,
24+
on_close=self.on_close)
25+
if threaded:
26+
self.thread = Thread(target=self.ws.run_forever)
27+
self.thread.daemon = True
28+
self.thread.start()
29+
else:
30+
self.ws.run_forever()
31+
32+
def on_message(self, ws, message):
33+
self.received_messages.append(message)
34+
print(f"TestClient: on_message: {message}")
35+
36+
def on_error(self, ws, error):
37+
self.errors.append(error)
38+
print(f"TestClient: on_error: {error}")
39+
40+
def on_close(self, ws, close_status_code, close_msg):
41+
self.closes.append((close_status_code, close_msg))
42+
print(f"TestClient: on_close: {close_status_code} - {close_msg}")
43+
44+
def on_open(self, ws):
45+
self.opens.append(ws)
46+
print("TestClient: on_open")
47+
48+
49+
class TestServer(WebsocketServer):
50+
def __init__(self, *args, **kwargs):
51+
super().__init__(*args, **kwargs)
52+
self.received_messages = []
53+
self.set_fn_message_received(self.handle_received_message)
54+
55+
def handle_received_message(self, client, server, message):
56+
self.received_messages.append(message)
57+
58+
1159
@pytest.fixture(scope='function')
1260
def server():
1361
""" Returns the response of a server after"""
14-
s = WebsocketServer(0, loglevel=logging.DEBUG)
62+
s = TestServer(0, loglevel=logging.DEBUG)
1563
server_thread = Thread(target=s.run_forever)
1664
server_thread.daemon = True
1765
server_thread.start()
@@ -21,6 +69,21 @@ def server():
2169

2270
@pytest.fixture
2371
def session(server):
24-
ws = create_connection("ws://{}:{}".format(*server.server_address))
25-
yield ws, server
26-
ws.close()
72+
"""
73+
Gives a simple connection to a server
74+
"""
75+
conn = websocket.create_connection("ws://{}:{}".format(*server.server_address))
76+
yield conn, server
77+
conn.close()
78+
79+
80+
@pytest.fixture
81+
def client_session(server):
82+
"""
83+
Gives a TestClient instance connected to a server
84+
"""
85+
client = TestClient(port=server.port)
86+
sleep(1)
87+
assert client.ws.sock and client.ws.sock.connected
88+
yield client, server
89+
client.ws.close()

websocket_server/websocket_server.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
OPCODE_PING = 0x9
4646
OPCODE_PONG = 0xA
4747

48+
CLOSE_STATUS_NORMAL = 1000
49+
DEFAULT_CLOSE_REASON = bytes('', encoding='utf-8')
50+
4851

4952
class API():
5053

@@ -83,6 +86,12 @@ def send_message(self, client, msg):
8386
def send_message_to_all(self, msg):
8487
self._multicast(msg)
8588

89+
def shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
90+
self._shutdown_gracefully(status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON)
91+
92+
def shutdown_abruptly(self):
93+
self._shutdown_abruptly()
94+
8695

8796
class WebsocketServer(ThreadingMixIn, TCPServer, API):
8897
"""
@@ -157,6 +166,36 @@ def handler_to_client(self, handler):
157166
if client['handler'] == handler:
158167
return client
159168

169+
def _terminate_client_handlers(self):
170+
"""
171+
Ensures request handler for each client is terminated correctly
172+
"""
173+
for client in self.clients:
174+
client["handler"].keep_alive = False
175+
client["handler"].finish()
176+
client["handler"].connection.close()
177+
178+
def _shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
179+
"""
180+
Send a CLOSE handshake to all connected clients before terminating server
181+
"""
182+
self.keep_alive = False
183+
184+
# Send CLOSE to clients
185+
for client in self.clients:
186+
client["handler"].send_close(CLOSE_STATUS_NORMAL, reason)
187+
188+
self._terminate_client_handlers()
189+
self.server_close()
190+
191+
def _shutdown_abruptly(self):
192+
"""
193+
Terminate server without sending a CLOSE handshake
194+
"""
195+
self.keep_alive = False
196+
self._terminate_client_handlers()
197+
self.server_close()
198+
160199

161200
class WebSocketHandler(StreamRequestHandler):
162201

@@ -245,6 +284,27 @@ def send_message(self, message):
245284
def send_pong(self, message):
246285
self.send_text(message, OPCODE_PONG)
247286

287+
def send_close(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
288+
"""
289+
Send CLOSE to client
290+
291+
Args:
292+
status: Status as defined in https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1
293+
reason: Text with reason of closing the connection
294+
"""
295+
if status < CLOSE_STATUS_NORMAL or status > 1015:
296+
raise Exception(f"CLOSE status must be between 1000 and 1015, got {status}")
297+
298+
header = bytearray()
299+
payload = struct.pack('!H', status) + reason
300+
payload_length = len(payload)
301+
assert payload_length <= 125, "We only support short closing reasons at the moment"
302+
303+
# Send CLOSE with status & reason
304+
header.append(FIN | OPCODE_CLOSE_CONN)
305+
header.append(payload_length)
306+
self.request.send(header + payload)
307+
248308
def send_text(self, message, opcode=OPCODE_TEXT):
249309
"""
250310
Important: Fragmented(=continuation) messages are not supported since

0 commit comments

Comments
 (0)