Skip to content

Feature/ Shutting down server #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 3, 2021
Merged
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ The WebsocketServer can be initialized with the below parameters.

*`key`* - If using SSL, this is the path to the key.

*`cert`* - If using SSL, this is the path to the certificate.
*`cert`* - If using SSL, this is the path to the certificate.


### Properties
Expand All @@ -78,6 +78,9 @@ The WebsocketServer can be initialized with the below parameters.
| `set_fn_message_received()` | Sets a callback function that will be called when a `client` sends a message | function | None |
| `send_message()` | Sends a `message` to a specific `client`. The message is a simple string. | client, message | None |
| `send_message_to_all()` | Sends a `message` to **all** connected clients. The message is a simple string. | message | None |
| `shutdown_gracefully()` | Shutdown server by sending a websocket CLOSE handshake to all connected clients. | None | None |
| `shutdown_abruptly()` | Shutdown server without sending any websocket CLOSE handshake. | None | None |



### Callback functions
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
tox>=3.24.0
IPython
pytest
websocket-client
websocket-client>=1.1.1
twine
71 changes: 69 additions & 2 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,73 @@
from utils import session, server
from utils import session, client_session, server
from time import sleep

import websocket
import pytest


def test_send_close(client_session):
"""
Ensure client stops receiving data once we send_close (socket is still open)
"""
client, server = client_session
assert client.received_messages == []

server.send_message_to_all("test1")
sleep(0.5)
assert client.received_messages == ["test1"]

# After CLOSE, client should not be receiving any messages
server.clients[-1]["handler"].send_close()
sleep(0.5)
server.send_message_to_all("test2")
sleep(0.5)
assert client.received_messages == ["test1"]


def test_shutdown_gracefully(client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_gracefully()
sleep(0.5)

# Ensure all parties disconnected
assert not client.ws.sock
assert server.socket.fileno() == -1
assert not server.clients


def test_shutdown_abruptly(client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_abruptly()
sleep(0.5)

# Ensure server socket died
assert server.socket.fileno() == -1

# Ensure client handler terminated
assert server.received_messages == []
assert client.errors == []
client.ws.send("1st msg after server shutdown")
sleep(0.5)

# Note the message is received since the client handler
# will terminate only once it has received the last message
# and break out of the keep_alive loop. Any consecutive messages
# will not be received though.
assert server.received_messages == ["1st msg after server shutdown"]
assert len(client.errors) == 1
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)

# Try to send 2nd message
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
client.ws.send("2nd msg after server shutdown")


def test_client_closes_gracefully(session):
client, server = session
assert client.connected
Expand All @@ -11,7 +76,9 @@ def test_client_closes_gracefully(session):
client.close()
assert not client.connected

# Ensure server closed connection
# Ensure server closed connection.
# We test this by having the server trying to send
# data to the client
assert not server.clients
with pytest.raises(BrokenPipeError):
old_client_handler.connection.send(b"test")
73 changes: 68 additions & 5 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,65 @@
import logging
from time import sleep
from threading import Thread

import pytest
from websocket import create_connection # websocket-client
import websocket # websocket-client

import _bootstrap_
from websocket_server import WebsocketServer


class TestClient():
def __init__(self, port, threaded=True):
self.received_messages = []
self.closes = []
self.opens = []
self.errors = []

websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(f"ws://localhost:{port}/",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
if threaded:
self.thread = Thread(target=self.ws.run_forever)
self.thread.daemon = True
self.thread.start()
else:
self.ws.run_forever()

def on_message(self, ws, message):
self.received_messages.append(message)
print(f"TestClient: on_message: {message}")

def on_error(self, ws, error):
self.errors.append(error)
print(f"TestClient: on_error: {error}")

def on_close(self, ws, close_status_code, close_msg):
self.closes.append((close_status_code, close_msg))
print(f"TestClient: on_close: {close_status_code} - {close_msg}")

def on_open(self, ws):
self.opens.append(ws)
print("TestClient: on_open")


class TestServer(WebsocketServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.received_messages = []
self.set_fn_message_received(self.handle_received_message)

def handle_received_message(self, client, server, message):
self.received_messages.append(message)


@pytest.fixture(scope='function')
def server():
""" Returns the response of a server after"""
s = WebsocketServer(0, loglevel=logging.DEBUG)
s = TestServer(0, loglevel=logging.DEBUG)
server_thread = Thread(target=s.run_forever)
server_thread.daemon = True
server_thread.start()
Expand All @@ -21,6 +69,21 @@ def server():

@pytest.fixture
def session(server):
ws = create_connection("ws://{}:{}".format(*server.server_address))
yield ws, server
ws.close()
"""
Gives a simple connection to a server
"""
conn = websocket.create_connection("ws://{}:{}".format(*server.server_address))
yield conn, server
conn.close()


@pytest.fixture
def client_session(server):
"""
Gives a TestClient instance connected to a server
"""
client = TestClient(port=server.port)
sleep(1)
assert client.ws.sock and client.ws.sock.connected
yield client, server
client.ws.close()
60 changes: 60 additions & 0 deletions websocket_server/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
OPCODE_PING = 0x9
OPCODE_PONG = 0xA

CLOSE_STATUS_NORMAL = 1000
DEFAULT_CLOSE_REASON = bytes('', encoding='utf-8')


class API():

Expand Down Expand Up @@ -83,6 +86,12 @@ def send_message(self, client, msg):
def send_message_to_all(self, msg):
self._multicast(msg)

def shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
self._shutdown_gracefully(status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON)

def shutdown_abruptly(self):
self._shutdown_abruptly()


class WebsocketServer(ThreadingMixIn, TCPServer, API):
"""
Expand Down Expand Up @@ -157,6 +166,36 @@ def handler_to_client(self, handler):
if client['handler'] == handler:
return client

def _terminate_client_handlers(self):
"""
Ensures request handler for each client is terminated correctly
"""
for client in self.clients:
client["handler"].keep_alive = False
client["handler"].finish()
client["handler"].connection.close()

def _shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
"""
Send a CLOSE handshake to all connected clients before terminating server
"""
self.keep_alive = False

# Send CLOSE to clients
for client in self.clients:
client["handler"].send_close(CLOSE_STATUS_NORMAL, reason)

self._terminate_client_handlers()
self.server_close()

def _shutdown_abruptly(self):
"""
Terminate server without sending a CLOSE handshake
"""
self.keep_alive = False
self._terminate_client_handlers()
self.server_close()


class WebSocketHandler(StreamRequestHandler):

Expand Down Expand Up @@ -245,6 +284,27 @@ def send_message(self, message):
def send_pong(self, message):
self.send_text(message, OPCODE_PONG)

def send_close(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
"""
Send CLOSE to client

Args:
status: Status as defined in https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1
reason: Text with reason of closing the connection
"""
if status < CLOSE_STATUS_NORMAL or status > 1015:
raise Exception(f"CLOSE status must be between 1000 and 1015, got {status}")

header = bytearray()
payload = struct.pack('!H', status) + reason
payload_length = len(payload)
assert payload_length <= 125, "We only support short closing reasons at the moment"

# Send CLOSE with status & reason
header.append(FIN | OPCODE_CLOSE_CONN)
header.append(payload_length)
self.request.send(header + payload)

def send_text(self, message, opcode=OPCODE_TEXT):
"""
Important: Fragmented(=continuation) messages are not supported since
Expand Down