Skip to content

Feature/threaded server #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ jobs:
- image: circleci/python:3
steps:
- checkout
- run:
name: install dependencies
command: |
sudo pip install tox
- run:
name: run tests
command: |
tox -e py37
pytest
- store_artifacts:
path: test-reports
destination: test-reports
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ The WebsocketServer can be initialized with the below parameters.

| Method | Description | Takes | Gives |
|-----------------------------|---------------------------------------------------------------------------------------|-----------------|-------|
| `run_forever()` | Runs server until shutdown_gracefully or shutdown_abruptly are called. | threaded: run server on its own thread if True | None |
| `set_fn_new_client()` | Sets a callback function that will be called for every new `client` connecting to us | function | None |
| `set_fn_client_left()` | Sets a callback function that will be called for every `client` disconnecting from us | function | None |
| `set_fn_message_received()` | Sets a callback function that will be called when a `client` sends a message | function | None |
Expand Down
7 changes: 4 additions & 3 deletions docs/release-workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ from development comes with a tag.

General flow

1. Update VERSION in setup.py from development branch and commit
2. Update releases.txt
3. Merge development into master (`git merge --no-ff development`)
1. Get in dev branch
2. Update VERSION in setup.py and releases.txt file
3. Make a commit
4. Merge development into master (`git merge --no-ff development`)
4. Add corresponding version as a new tag (`git tag <new_version>`) e.g. git tag v0.3.0
5. Push everything (`git push --tags && git push`)
3 changes: 3 additions & 0 deletions releases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
0.5.1
- SSL support
- Drop Python 2 support

0.5.4
- Add API for shutting down server (abruptly & gracefully)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from distutils.command.install import install


VERSION = '0.5.1'
VERSION = '0.5.4'


def get_tag_version():
Expand Down
187 changes: 109 additions & 78 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,115 @@
from utils import session, client_session, server
from utils import session, client_session, threaded_server
from time import sleep
import threading

from websocket_server import WebsocketServer

import websocket
import pytest


def test_send_close(client_session):
"""
Ensure client stops receiving data once we send_close (socket is still open)
"""
client, server = client_session
assert client.received_messages == []

server.send_message_to_all("test1")
sleep(0.5)
assert client.received_messages == ["test1"]

# After CLOSE, client should not be receiving any messages
server.clients[-1]["handler"].send_close()
sleep(0.5)
server.send_message_to_all("test2")
sleep(0.5)
assert client.received_messages == ["test1"]


def test_shutdown_gracefully(client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_gracefully()
sleep(0.5)

# Ensure all parties disconnected
assert not client.ws.sock
assert server.socket.fileno() == -1
assert not server.clients


def test_shutdown_abruptly(client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_abruptly()
sleep(0.5)

# Ensure server socket died
assert server.socket.fileno() == -1

# Ensure client handler terminated
assert server.received_messages == []
assert client.errors == []
client.ws.send("1st msg after server shutdown")
sleep(0.5)

# Note the message is received since the client handler
# will terminate only once it has received the last message
# and break out of the keep_alive loop. Any consecutive messages
# will not be received though.
assert server.received_messages == ["1st msg after server shutdown"]
assert len(client.errors) == 1
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)

# Try to send 2nd message
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
client.ws.send("2nd msg after server shutdown")


def test_client_closes_gracefully(session):
client, server = session
assert client.connected
assert server.clients
old_client_handler = server.clients[0]["handler"]
client.close()
assert not client.connected

# Ensure server closed connection.
# We test this by having the server trying to send
# data to the client
assert not server.clients
with pytest.raises(BrokenPipeError):
old_client_handler.connection.send(b"test")
class TestServerThreadedWithoutClient():
def test_run_forever(self, threaded_server):
assert threaded_server.thread
assert not isinstance(threaded_server.thread, threading._MainThread)
assert threaded_server.thread.is_alive()

def test_shutdown(self, threaded_server):
assert threaded_server.thread.is_alive()

# Shutdown de-facto way
# REF: https://docs.python.org/3/library/socketserver.html
# "Tell the serve_forever() loop to stop and
# wait until it does. shutdown() must be called while serve_forever()
# is running in a different thread otherwise it will deadlock."
threaded_server.shutdown()
assert not threaded_server.thread.is_alive()

def test_shutdown_gracefully_without_clients(self, threaded_server):
assert threaded_server.thread.is_alive()
threaded_server.shutdown_gracefully()
assert not threaded_server.thread.is_alive()
assert threaded_server.socket.fileno() <= 0

def test_shutdown_abruptly_without_clients(self, threaded_server):
assert threaded_server.thread.is_alive()
threaded_server.shutdown_abruptly()
assert not threaded_server.thread.is_alive()
assert threaded_server.socket.fileno() <= 0


class TestServerThreadedWithClient():
def test_send_close(self, client_session):
"""
Ensure client stops receiving data once we send_close (socket is still open)
"""
client, server = client_session
assert client.received_messages == []

server.send_message_to_all("test1")
sleep(0.5)
assert client.received_messages == ["test1"]

# After CLOSE, client should not be receiving any messages
server.clients[-1]["handler"].send_close()
sleep(0.5)
server.send_message_to_all("test2")
sleep(0.5)
assert client.received_messages == ["test1"]

def test_shutdown_gracefully(self, client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_gracefully()
sleep(0.5)

# Ensure all parties disconnected
assert not client.ws.sock
assert server.socket.fileno() == -1
assert not server.clients

def test_shutdown_abruptly(self, client_session):
client, server = client_session
assert client.ws.sock and client.ws.sock.connected
assert server.socket.fileno() > 0

server.shutdown_abruptly()
sleep(0.5)

# Ensure server socket died
assert server.socket.fileno() == -1

# Ensure client handler terminated
assert server.received_messages == []
assert client.errors == []
client.ws.send("1st msg after server shutdown")
sleep(0.5)

# Note the message is received since the client handler
# will terminate only once it has received the last message
# and break out of the keep_alive loop. Any consecutive messages
# will not be received though.
assert server.received_messages == ["1st msg after server shutdown"]
assert len(client.errors) == 1
assert isinstance(client.errors[0], websocket._exceptions.WebSocketConnectionClosedException)

# Try to send 2nd message
with pytest.raises(websocket._exceptions.WebSocketConnectionClosedException):
client.ws.send("2nd msg after server shutdown")

def test_client_closes_gracefully(self, session):
client, server = session
assert client.connected
assert server.clients
old_client_handler = server.clients[0]["handler"]
client.close()
assert not client.connected

# Ensure server closed connection.
# We test this by having the server trying to send
# data to the client
assert not server.clients
with pytest.raises(BrokenPipeError):
old_client_handler.connection.send(b"test")
24 changes: 11 additions & 13 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,31 @@ def handle_received_message(self, client, server, message):


@pytest.fixture(scope='function')
def server():
def threaded_server():
""" Returns the response of a server after"""
s = TestServer(0, loglevel=logging.DEBUG)
server_thread = Thread(target=s.run_forever)
server_thread.daemon = True
server_thread.start()
yield s
s.server_close()
server = TestServer(0, loglevel=logging.DEBUG)
server.run_forever(threaded=True)
yield server
server.server_close()


@pytest.fixture
def session(server):
def session(threaded_server):
"""
Gives a simple connection to a server
"""
conn = websocket.create_connection("ws://{}:{}".format(*server.server_address))
yield conn, server
conn = websocket.create_connection("ws://{}:{}".format(*threaded_server.server_address))
yield conn, threaded_server
conn.close()


@pytest.fixture
def client_session(server):
def client_session(threaded_server):
"""
Gives a TestClient instance connected to a server
"""
client = TestClient(port=server.port)
client = TestClient(port=threaded_server.port)
sleep(1)
assert client.ws.sock and client.ws.sock.connected
yield client, server
yield client, threaded_server
client.ws.close()
38 changes: 38 additions & 0 deletions websocket_server/thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import threading


class ThreadWithLoggedException(threading.Thread):
"""
Similar to Thread but will log exceptions to passed logger.

Args:
logger: Logger instance used to log any exception in child thread

Exception is also reachable via <thread>.exception from the main thread.
"""

DIVIDER = "*"*80

def __init__(self, *args, **kwargs):
try:
self.logger = kwargs.pop("logger")
except KeyError:
raise Exception("Missing 'logger' in kwargs")
super().__init__(*args, **kwargs)
self.exception = None

def run(self):
try:
if self._target is not None:
self._target(*self._args, **self._kwargs)
except Exception as exception:
thread = threading.current_thread()
self.exception = exception
self.logger.exception(f"{self.DIVIDER}\nException in child thread {thread}: {exception}\n{self.DIVIDER}")
finally:
del self._target, self._args, self._kwargs


class WebsocketServerThread(ThreadWithLoggedException):
"""Dummy wrapper to make debug messages a bit more readable"""
pass
Loading