From bf11ad36aebeebbc0b7a74d88d4b9a8fad113456 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 3 Sep 2023 16:39:44 +0100 Subject: [PATCH] Optimize memory usage during broadcasts (#1233) --- setup.cfg | 2 +- src/socketio/asyncio_manager.py | 47 +++++- src/socketio/asyncio_server.py | 17 +- src/socketio/base_manager.py | 41 ++++- src/socketio/server.py | 21 +-- tests/asyncio/test_asyncio_manager.py | 164 ++++++++++++++----- tests/asyncio/test_asyncio_pubsub_manager.py | 13 +- tests/asyncio/test_asyncio_server.py | 83 +++------- tests/common/test_base_manager.py | 121 +++++++++----- tests/common/test_pubsub_manager.py | 9 +- tests/common/test_server.py | 62 ++----- tests/performance/run.sh | 1 + tests/performance/server_send.py | 3 + tests/performance/server_send_broadcast.py | 30 ++++ 14 files changed, 366 insertions(+), 248 deletions(-) create mode 100644 tests/performance/server_send_broadcast.py diff --git a/setup.cfg b/setup.cfg index 860e9f99..712d015a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,7 +25,7 @@ packages = find: python_requires = >=3.6 install_requires = bidict >= 0.21.0 - python-engineio >= 4.3.0 + python-engineio >= 4.7.0 [options.packages.find] where = src diff --git a/src/socketio/asyncio_manager.py b/src/socketio/asyncio_manager.py index 47013dc6..2bf90011 100644 --- a/src/socketio/asyncio_manager.py +++ b/src/socketio/asyncio_manager.py @@ -1,5 +1,7 @@ import asyncio +from engineio import packet as eio_packet +from socketio import packet from .base_manager import BaseManager @@ -17,18 +19,45 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None, """ if namespace not in self.rooms: return - tasks = [] + if isinstance(data, tuple): + # tuples are expanded to multiple arguments, everything else is + # sent as a single argument + data = list(data) + elif data is not None: + data = [data] + else: + data = [] if not isinstance(skip_sid, list): skip_sid = [skip_sid] - for sid, eio_sid in self.get_participants(namespace, room): - if sid not in skip_sid: - if callback is not None: + tasks = [] + if not callback: + # when callbacks aren't used the packets sent to each recipient are + # identical, so they can be generated once and reused + pkt = self.server.packet_class( + packet.EVENT, namespace=namespace, data=[event] + data) + encoded_packet = pkt.encode() + if not isinstance(encoded_packet, list): + encoded_packet = [encoded_packet] + eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p) + for p in encoded_packet] + for sid, eio_sid in self.get_participants(namespace, room): + if sid not in skip_sid: + for p in eio_pkt: + tasks.append(asyncio.create_task( + self.server._send_eio_packet(eio_sid, p))) + else: + # callbacks are used, so each recipient must be sent a packet that + # contains a unique callback id + # note that callbacks when addressing a group of people are + # implemented but not tested or supported + for sid, eio_sid in self.get_participants(namespace, room): + if sid not in skip_sid: # pragma: no branch id = self._generate_ack_id(sid, callback) - else: - id = None - tasks.append(asyncio.create_task( - self.server._emit_internal(eio_sid, event, data, - namespace, id))) + pkt = self.server.packet_class( + packet.EVENT, namespace=namespace, data=[event] + data, + id=id) + tasks.append(asyncio.create_task( + self.server._send_packet(eio_sid, pkt))) if tasks == []: # pragma: no cover return await asyncio.wait(tasks) diff --git a/src/socketio/asyncio_server.py b/src/socketio/asyncio_server.py index e5df2a9b..aff8812a 100644 --- a/src/socketio/asyncio_server.py +++ b/src/socketio/asyncio_server.py @@ -424,19 +424,6 @@ async def sleep(self, seconds=0): """ return await self.eio.sleep(seconds) - async def _emit_internal(self, sid, event, data, namespace=None, id=None): - """Send a message to a client.""" - # tuples are expanded to multiple arguments, everything else is sent - # as a single argument - if isinstance(data, tuple): - data = list(data) - elif data is not None: - data = [data] - else: - data = [] - await self._send_packet(sid, self.packet_class( - packet.EVENT, namespace=namespace, data=[event] + data, id=id)) - async def _send_packet(self, eio_sid, pkt): """Send a Socket.IO packet to a client.""" encoded_packet = pkt.encode() @@ -446,6 +433,10 @@ async def _send_packet(self, eio_sid, pkt): else: await self.eio.send(eio_sid, encoded_packet) + async def _send_eio_packet(self, eio_sid, eio_pkt): + """Send a raw Engine.IO packet to a client.""" + await self.eio.send_packet(eio_sid, eio_pkt) + async def _handle_connect(self, eio_sid, namespace, data): """Handle a client connection request.""" namespace = namespace or '/' diff --git a/src/socketio/base_manager.py b/src/socketio/base_manager.py index 87d23879..4330bac4 100644 --- a/src/socketio/base_manager.py +++ b/src/socketio/base_manager.py @@ -2,6 +2,8 @@ import logging from bidict import bidict, ValueDuplicationError +from engineio import packet as eio_packet +from socketio import packet default_logger = logging.getLogger('socketio') @@ -161,15 +163,42 @@ def emit(self, event, data, namespace, room=None, skip_sid=None, connected to the namespace.""" if namespace not in self.rooms: return + if isinstance(data, tuple): + # tuples are expanded to multiple arguments, everything else is + # sent as a single argument + data = list(data) + elif data is not None: + data = [data] + else: + data = [] if not isinstance(skip_sid, list): skip_sid = [skip_sid] - for sid, eio_sid in self.get_participants(namespace, room): - if sid not in skip_sid: - if callback is not None: + if not callback: + # when callbacks aren't used the packets sent to each recipient are + # identical, so they can be generated once and reused + pkt = self.server.packet_class( + packet.EVENT, namespace=namespace, data=[event] + data) + encoded_packet = pkt.encode() + if not isinstance(encoded_packet, list): + encoded_packet = [encoded_packet] + eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p) + for p in encoded_packet] + for sid, eio_sid in self.get_participants(namespace, room): + if sid not in skip_sid: + for p in eio_pkt: + self.server._send_eio_packet(eio_sid, p) + else: + # callbacks are used, so each recipient must be sent a packet that + # contains a unique callback id + # note that callbacks when addressing a group of people are + # implemented but not tested or supported + for sid, eio_sid in self.get_participants(namespace, room): + if sid not in skip_sid: # pragma: no branch id = self._generate_ack_id(sid, callback) - else: - id = None - self.server._emit_internal(eio_sid, event, data, namespace, id) + pkt = self.server.packet_class( + packet.EVENT, namespace=namespace, data=[event] + data, + id=id) + self.server._send_packet(eio_sid, pkt) def trigger_callback(self, sid, id, data): """Invoke an application callback.""" diff --git a/src/socketio/server.py b/src/socketio/server.py index e8231c31..48d55983 100644 --- a/src/socketio/server.py +++ b/src/socketio/server.py @@ -185,7 +185,7 @@ def on(self, event, handler=None, namespace=None): Example usage:: # as a decorator: - @socket_io.on('connect', namespace='/chat') + @sio.on('connect', namespace='/chat') def connect_handler(sid, environ): print('Connection request') if environ['REMOTE_ADDR'] in blacklisted: @@ -194,7 +194,7 @@ def connect_handler(sid, environ): # as a method: def message_handler(sid, msg): print('Received message: ', msg) - eio.send(sid, 'response') + sio.send(sid, 'response') socket_io.on('message', namespace='/chat', handler=message_handler) The handler function receives the ``sid`` (session ID) for the @@ -633,19 +633,6 @@ def sleep(self, seconds=0): """ return self.eio.sleep(seconds) - def _emit_internal(self, eio_sid, event, data, namespace=None, id=None): - """Send a message to a client.""" - # tuples are expanded to multiple arguments, everything else is sent - # as a single argument - if isinstance(data, tuple): - data = list(data) - elif data is not None: - data = [data] - else: - data = [] - self._send_packet(eio_sid, self.packet_class( - packet.EVENT, namespace=namespace, data=[event] + data, id=id)) - def _send_packet(self, eio_sid, pkt): """Send a Socket.IO packet to a client.""" encoded_packet = pkt.encode() @@ -655,6 +642,10 @@ def _send_packet(self, eio_sid, pkt): else: self.eio.send(eio_sid, encoded_packet) + def _send_eio_packet(self, eio_sid, eio_pkt): + """Send a raw Engine.IO packet to a client.""" + self.eio.send_packet(eio_sid, eio_pkt) + def _handle_connect(self, eio_sid, namespace, data): """Handle a client connection request.""" namespace = namespace or '/' diff --git a/tests/asyncio/test_asyncio_manager.py b/tests/asyncio/test_asyncio_manager.py index 9d9c849b..1fd7b0c4 100644 --- a/tests/asyncio/test_asyncio_manager.py +++ b/tests/asyncio/test_asyncio_manager.py @@ -4,6 +4,7 @@ from unittest import mock from socketio import asyncio_manager +from socketio import packet def AsyncMock(*args, **kwargs): @@ -33,8 +34,10 @@ def generate_id(): return str(id) mock_server = mock.MagicMock() - mock_server._emit_internal = AsyncMock() + mock_server._send_packet = AsyncMock() + mock_server._send_eio_packet = AsyncMock() mock_server.eio.generate_id = generate_id + mock_server.packet_class = packet.Packet self.bm = asyncio_manager.AsyncManager() self.bm.set_server(mock_server) self.bm.initialize() @@ -221,9 +224,11 @@ def test_emit_to_sid(self): 'my event', {'foo': 'bar'}, namespace='/foo', room=sid ) ) - self.bm.server._emit_internal.mock.assert_called_once_with( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 1 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_room(self): sid1 = self.bm.connect('123', '/foo') @@ -236,13 +241,15 @@ def test_emit_to_room(self): 'my event', {'foo': 'bar'}, namespace='/foo', room='bar' ) ) - assert self.bm.server._emit_internal.mock.call_count == 2 - self.bm.server._emit_internal.mock.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 2 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \ + == '456' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \ + == pkt + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_rooms(self): sid1 = self.bm.connect('123', '/foo') @@ -256,16 +263,19 @@ def test_emit_to_rooms(self): self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room=['bar', 'baz']) ) - assert self.bm.server._emit_internal.mock.call_count == 3 - self.bm.server._emit_internal.mock.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 3 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \ + == '456' + assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][0] \ + == '789' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \ + == pkt + assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][1] \ + == pkt + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all(self): sid1 = self.bm.connect('123', '/foo') @@ -275,16 +285,19 @@ def test_emit_to_all(self): self.bm.connect('789', '/foo') self.bm.connect('abc', '/bar') _run(self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo')) - assert self.bm.server._emit_internal.mock.call_count == 3 - self.bm.server._emit_internal.mock.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 3 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \ + == '456' + assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][0] \ + == '789' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \ + == pkt + assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][1] \ + == pkt + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_one(self): sid1 = self.bm.connect('123', '/foo') @@ -298,13 +311,15 @@ def test_emit_to_all_skip_one(self): 'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=sid2 ) ) - assert self.bm.server._emit_internal.mock.call_count == 2 - self.bm.server._emit_internal.mock.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.mock.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 2 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \ + == '789' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \ + == pkt + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_two(self): sid1 = self.bm.connect('123', '/foo') @@ -321,10 +336,11 @@ def test_emit_to_all_skip_two(self): skip_sid=[sid1, sid3], ) ) - assert self.bm.server._emit_internal.mock.call_count == 1 - self.bm.server._emit_internal.mock.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.mock.call_count == 1 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '456' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_with_callback(self): sid = self.bm.connect('123', '/foo') @@ -336,9 +352,11 @@ def test_emit_with_callback(self): ) ) self.bm._generate_ack_id.assert_called_once_with(sid, 'cb') - self.bm.server._emit_internal.mock.assert_called_once_with( - '123', 'my event', {'foo': 'bar'}, '/foo', 11 - ) + assert self.bm.server._send_packet.mock.call_count == 1 + assert self.bm.server._send_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '2/foo,11["my event",{"foo":"bar"}]' def test_emit_to_invalid_room(self): _run( @@ -347,3 +365,59 @@ def test_emit_to_invalid_room(self): def test_emit_to_invalid_namespace(self): _run(self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo')) + + def test_emit_with_tuple(self): + sid = self.bm.connect('123', '/foo') + _run( + self.bm.emit( + 'my event', ('foo', 'bar'), namespace='/foo', room=sid + ) + ) + assert self.bm.server._send_eio_packet.mock.call_count == 1 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event","foo","bar"]' + + def test_emit_with_list(self): + sid = self.bm.connect('123', '/foo') + _run( + self.bm.emit( + 'my event', ['foo', 'bar'], namespace='/foo', room=sid + ) + ) + assert self.bm.server._send_eio_packet.mock.call_count == 1 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",["foo","bar"]]' + + def test_emit_with_none(self): + sid = self.bm.connect('123', '/foo') + _run( + self.bm.emit( + 'my event', None, namespace='/foo', room=sid + ) + ) + assert self.bm.server._send_eio_packet.mock.call_count == 1 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event"]' + + def test_emit_binary(self): + sid = self.bm.connect('123', '/') + _run( + self.bm.emit( + u'my event', b'my binary data', namespace='/', room=sid + ) + ) + assert self.bm.server._send_eio_packet.mock.call_count == 2 + assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '451-["my event",{"_placeholder":true,"num":0}]' + assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \ + == '123' + pkt = self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] + assert pkt.encode() == b'my binary data' diff --git a/tests/asyncio/test_asyncio_pubsub_manager.py b/tests/asyncio/test_asyncio_pubsub_manager.py index 80a821b4..5541bbda 100644 --- a/tests/asyncio/test_asyncio_pubsub_manager.py +++ b/tests/asyncio/test_asyncio_pubsub_manager.py @@ -8,6 +8,7 @@ from socketio import asyncio_manager from socketio import asyncio_pubsub_manager +from socketio import packet def AsyncMock(*args, **kwargs): @@ -38,7 +39,9 @@ def generate_id(): mock_server = mock.MagicMock() mock_server.eio.generate_id = generate_id - mock_server._emit_internal = AsyncMock() + mock_server.packet_class = packet.Packet + mock_server._send_packet = AsyncMock() + mock_server._send_eio_packet = AsyncMock() mock_server.disconnect = AsyncMock() self.pm = asyncio_pubsub_manager.AsyncPubSubManager() self.pm._publish = AsyncMock() @@ -164,9 +167,11 @@ def test_emit_with_ignore_queue(self): ) ) self.pm._publish.mock.assert_not_called() - self.pm.server._emit_internal.mock.assert_called_once_with( - '123', 'foo', 'bar', '/', None - ) + assert self.pm.server._send_eio_packet.mock.call_count == 1 + assert self.pm.server._send_eio_packet.mock.call_args_list[0][0][0] \ + == '123' + pkt = self.pm.server._send_eio_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '42["foo","bar"]' def test_can_disconnect(self): sid = self.pm.connect('123', '/') diff --git a/tests/asyncio/test_asyncio_server.py b/tests/asyncio/test_asyncio_server.py index 27d8b378..617aadaf 100644 --- a/tests/asyncio/test_asyncio_server.py +++ b/tests/asyncio/test_asyncio_server.py @@ -5,6 +5,7 @@ from unittest import mock from engineio import json +from engineio import packet as eio_packet import pytest from socketio import asyncio_server @@ -32,7 +33,8 @@ def _run(coro): @unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+') @mock.patch('socketio.server.engineio.AsyncServer', **{ - 'return_value.generate_id.side_effect': [str(i) for i in range(1, 10)]}) + 'return_value.generate_id.side_effect': [str(i) for i in range(1, 10)], + 'return_value.send_packet': AsyncMock()}) class TestAsyncServer(unittest.TestCase): def tearDown(self): # restore JSON encoder, in case a test changed it @@ -295,72 +297,24 @@ def test_handle_request(self, eio): _run(s.handle_request('environ')) s.eio.handle_request.mock.assert_called_once_with('environ') - def test_emit_internal(self, eio): + def test_send_packet(self, eio): eio.return_value.send = AsyncMock() s = asyncio_server.AsyncServer() - _run(s._emit_internal('123', 'my event', 'my data', namespace='/foo')) + _run(s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'my data'], namespace='/foo'))) s.eio.send.mock.assert_called_once_with( '123', '2/foo,["my event","my data"]' ) - def test_emit_internal_with_tuple(self, eio): + def test_send_eio_packet(self, eio): eio.return_value.send = AsyncMock() s = asyncio_server.AsyncServer() - _run( - s._emit_internal( - '123', 'my event', ('foo', 'bar'), namespace='/foo' - ) - ) - s.eio.send.mock.assert_called_once_with( - '123', '2/foo,["my event","foo","bar"]' - ) - - def test_emit_internal_with_list(self, eio): - eio.return_value.send = AsyncMock() - s = asyncio_server.AsyncServer() - _run( - s._emit_internal( - '123', 'my event', ['foo', 'bar'], namespace='/foo' - ) - ) - s.eio.send.mock.assert_called_once_with( - '123', '2/foo,["my event",["foo","bar"]]' - ) - - def test_emit_internal_with_none(self, eio): - eio.return_value.send = AsyncMock() - s = asyncio_server.AsyncServer() - _run(s._emit_internal('123', 'my event', None, namespace='/foo')) - s.eio.send.mock.assert_called_once_with( - '123', '2/foo,["my event"]' - ) - - def test_emit_internal_with_callback(self, eio): - eio.return_value.send = AsyncMock() - s = asyncio_server.AsyncServer() - id = s.manager._generate_ack_id('1', 'cb') - _run( - s._emit_internal( - '123', 'my event', 'my data', namespace='/foo', id=id - ) - ) - s.eio.send.mock.assert_called_once_with( - '123', '2/foo,1["my event","my data"]' - ) - - def test_emit_internal_default_namespace(self, eio): - eio.return_value.send = AsyncMock() - s = asyncio_server.AsyncServer() - _run(s._emit_internal('123', 'my event', 'my data')) - s.eio.send.mock.assert_called_once_with( - '123', '2["my event","my data"]' - ) - - def test_emit_internal_binary(self, eio): - eio.return_value.send = AsyncMock() - s = asyncio_server.AsyncServer() - _run(s._emit_internal('123', u'my event', b'my binary data')) - assert s.eio.send.mock.call_count == 2 + _run(s._send_eio_packet('123', eio_packet.Packet( + eio_packet.MESSAGE, 'hello'))) + assert s.eio.send_packet.mock.call_count == 1 + assert s.eio.send_packet.mock.call_args_list[0][0][0] == '123' + pkt = s.eio.send_packet.mock.call_args_list[0][0][1] + assert pkt.encode() == '4hello' def test_transport(self, eio): eio.return_value.send = AsyncMock() @@ -804,8 +758,10 @@ def test_send_with_ack(self, eio): cb = mock.MagicMock() id1 = s.manager._generate_ack_id('1', cb) id2 = s.manager._generate_ack_id('1', cb) - _run(s._emit_internal('123', 'my event', ['foo'], id=id1)) - _run(s._emit_internal('123', 'my event', ['bar'], id=id2)) + _run(s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'foo'], id=id1))) + _run(s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'bar'], id=id2))) _run(s._handle_eio_message('123', '31["foo",2]')) cb.assert_called_once_with('foo', 2) @@ -818,8 +774,9 @@ def test_send_with_ack_namespace(self, eio): cb = mock.MagicMock() id = s.manager._generate_ack_id('1', cb) _run( - s._emit_internal( - '123', 'my event', ['foo'], namespace='/foo', id=id + s._send_packet( + '123', packet.Packet(packet.EVENT, ['my event', 'foo'], + namespace='/foo', id=id) ) ) _run(s._handle_eio_message('123', '3/foo,1["foo",2]')) diff --git a/tests/common/test_base_manager.py b/tests/common/test_base_manager.py index b9337452..ae1490c8 100644 --- a/tests/common/test_base_manager.py +++ b/tests/common/test_base_manager.py @@ -4,6 +4,7 @@ import pytest from socketio import base_manager +from socketio import packet class TestBaseManager(unittest.TestCase): @@ -17,6 +18,7 @@ def generate_id(): mock_server = mock.MagicMock() mock_server.eio.generate_id = generate_id + mock_server.packet_class = packet.Packet self.bm = base_manager.BaseManager() self.bm.set_server(mock_server) self.bm.initialize() @@ -205,9 +207,10 @@ def test_emit_to_sid(self): sid = self.bm.connect('123', '/foo') self.bm.connect('456', '/foo') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room=sid) - self.bm.server._emit_internal.assert_called_once_with( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 1 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_room(self): sid1 = self.bm.connect('123', '/foo') @@ -216,13 +219,12 @@ def test_emit_to_room(self): self.bm.enter_room(sid2, '/foo', 'bar') self.bm.connect('789', '/foo') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room='bar') - assert self.bm.server._emit_internal.call_count == 2 - self.bm.server._emit_internal.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 2 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_rooms(self): sid1 = self.bm.connect('123', '/foo') @@ -234,16 +236,14 @@ def test_emit_to_rooms(self): self.bm.enter_room(sid3, '/foo', 'baz') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room=['bar', 'baz']) - assert self.bm.server._emit_internal.call_count == 3 - self.bm.server._emit_internal.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 3 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' + assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all(self): sid1 = self.bm.connect('123', '/foo') @@ -253,16 +253,14 @@ def test_emit_to_all(self): self.bm.connect('789', '/foo') self.bm.connect('abc', '/bar') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo') - assert self.bm.server._emit_internal.call_count == 3 - self.bm.server._emit_internal.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 3 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' + assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_one(self): sid1 = self.bm.connect('123', '/foo') @@ -274,13 +272,12 @@ def test_emit_to_all_skip_one(self): self.bm.emit( 'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=sid2 ) - assert self.bm.server._emit_internal.call_count == 2 - self.bm.server._emit_internal.assert_any_call( - '123', 'my event', {'foo': 'bar'}, '/foo', None - ) - self.bm.server._emit_internal.assert_any_call( - '789', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 2 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '789' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_two(self): sid1 = self.bm.connect('123', '/foo') @@ -295,10 +292,10 @@ def test_emit_to_all_skip_two(self): namespace='/foo', skip_sid=[sid1, sid3], ) - assert self.bm.server._emit_internal.call_count == 1 - self.bm.server._emit_internal.assert_any_call( - '456', 'my event', {'foo': 'bar'}, '/foo', None - ) + assert self.bm.server._send_eio_packet.call_count == 1 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '456' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_with_callback(self): sid = self.bm.connect('123', '/foo') @@ -308,12 +305,48 @@ def test_emit_with_callback(self): 'my event', {'foo': 'bar'}, namespace='/foo', callback='cb' ) self.bm._generate_ack_id.assert_called_once_with(sid, 'cb') - self.bm.server._emit_internal.assert_called_once_with( - '123', 'my event', {'foo': 'bar'}, '/foo', 11 - ) + assert self.bm.server._send_packet.call_count == 1 + assert self.bm.server._send_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_packet.call_args_list[0][0][1] + assert pkt.encode() == '2/foo,11["my event",{"foo":"bar"}]' def test_emit_to_invalid_room(self): self.bm.emit('my event', {'foo': 'bar'}, namespace='/', room='123') def test_emit_to_invalid_namespace(self): self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo') + + def test_emit_with_tuple(self): + sid = self.bm.connect('123', '/foo') + self.bm.emit('my event', ('foo', 'bar'), namespace='/foo', room=sid) + assert self.bm.server._send_eio_packet.call_count == 1 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event","foo","bar"]' + + def test_emit_with_list(self): + sid = self.bm.connect('123', '/foo') + self.bm.emit('my event', ['foo', 'bar'], namespace='/foo', room=sid) + assert self.bm.server._send_eio_packet.call_count == 1 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event",["foo","bar"]]' + + def test_emit_with_none(self): + sid = self.bm.connect('123', '/foo') + self.bm.emit('my event', None, namespace='/foo', room=sid) + assert self.bm.server._send_eio_packet.call_count == 1 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42/foo,["my event"]' + + def test_emit_binary(self): + sid = self.bm.connect('123', '/') + self.bm.emit(u'my event', b'my binary data', namespace='/', room=sid) + assert self.bm.server._send_eio_packet.call_count == 2 + assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '451-["my event",{"_placeholder":true,"num":0}]' + assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '123' + pkt = self.bm.server._send_eio_packet.call_args_list[1][0][1] + assert pkt.encode() == b'my binary data' diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index b1bb53b3..51e1f921 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -7,6 +7,7 @@ from socketio import base_manager from socketio import pubsub_manager +from socketio import packet class TestPubSubManager(unittest.TestCase): @@ -20,6 +21,7 @@ def generate_id(): mock_server = mock.MagicMock() mock_server.eio.generate_id = generate_id + mock_server.packet_class = packet.Packet self.pm = pubsub_manager.PubSubManager() self.pm._publish = mock.MagicMock() self.pm.set_server(mock_server) @@ -157,9 +159,10 @@ def test_emit_with_ignore_queue(self): 'foo', 'bar', room=sid, namespace='/', ignore_queue=True ) self.pm._publish.assert_not_called() - self.pm.server._emit_internal.assert_called_once_with( - '123', 'foo', 'bar', '/', None - ) + assert self.pm.server._send_eio_packet.call_count == 1 + assert self.pm.server._send_eio_packet.call_args_list[0][0][0] == '123' + pkt = self.pm.server._send_eio_packet.call_args_list[0][0][1] + assert pkt.encode() == '42["foo","bar"]' def test_can_disconnect(self): sid = self.pm.connect('123', '/') diff --git a/tests/common/test_server.py b/tests/common/test_server.py index 7d06c934..9285e570 100644 --- a/tests/common/test_server.py +++ b/tests/common/test_server.py @@ -3,6 +3,7 @@ from unittest import mock from engineio import json +from engineio import packet as eio_packet import pytest from socketio import exceptions @@ -271,53 +272,22 @@ def test_handle_request(self, eio): 'environ', 'start_response' ) - def test_emit_internal(self, eio): + def test_send_packet(self, eio): s = server.Server() - s._emit_internal('123', 'my event', 'my data', namespace='/foo') + s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'my data'], namespace='/foo')) s.eio.send.assert_called_once_with( '123', '2/foo,["my event","my data"]' ) - def test_emit_internal_with_tuple(self, eio): + def test_send_eio_packet(self, eio): s = server.Server() - s._emit_internal('123', 'my event', ('foo', 'bar'), namespace='/foo') - s.eio.send.assert_called_once_with( - '123', '2/foo,["my event","foo","bar"]' - ) - - def test_emit_internal_with_list(self, eio): - s = server.Server() - s._emit_internal('123', 'my event', ['foo', 'bar'], namespace='/foo') - s.eio.send.assert_called_once_with( - '123', '2/foo,["my event",["foo","bar"]]' - ) - - def test_emit_internal_with_none(self, eio): - s = server.Server() - s._emit_internal('123', 'my event', None, namespace='/foo') - s.eio.send.assert_called_once_with( - '123', '2/foo,["my event"]' - ) - - def test_emit_internal_with_callback(self, eio): - s = server.Server() - id = s.manager._generate_ack_id('1', 'cb') - s._emit_internal('123', 'my event', 'my data', namespace='/foo', id=id) - s.eio.send.assert_called_once_with( - '123', '2/foo,1["my event","my data"]' - ) - - def test_emit_internal_default_namespace(self, eio): - s = server.Server() - s._emit_internal('123', 'my event', 'my data') - s.eio.send.assert_called_once_with( - '123', '2["my event","my data"]' - ) - - def test_emit_internal_binary(self, eio): - s = server.Server() - s._emit_internal('123', u'my event', b'my binary data') - assert s.eio.send.call_count == 2 + s._send_eio_packet('123', eio_packet.Packet( + eio_packet.MESSAGE, 'hello')) + assert s.eio.send_packet.call_count == 1 + assert s.eio.send_packet.call_args_list[0][0][0] == '123' + pkt = s.eio.send_packet.call_args_list[0][0][1] + assert pkt.encode() == '4hello' def test_transport(self, eio): s = server.Server() @@ -412,7 +382,6 @@ def test_handle_connect_namespace_twice(self, eio): assert s.manager.is_connected('1', '/foo') handler.assert_called_once_with('1', 'environ') s.eio.send.assert_any_call('123', '0/foo,{"sid":"1"}') - print(s.eio.send.call_args_list) s.eio.send.assert_any_call('123', '4/foo,"Unable to connect"') def test_handle_connect_always_connect(self, eio): @@ -714,8 +683,10 @@ def test_send_with_ack(self, eio): cb = mock.MagicMock() id1 = s.manager._generate_ack_id('1', cb) id2 = s.manager._generate_ack_id('1', cb) - s._emit_internal('123', 'my event', ['foo'], id=id1) - s._emit_internal('123', 'my event', ['bar'], id=id2) + s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'foo'], id=id1)) + s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'bar'], id=id2)) s._handle_eio_message('123', '31["foo",2]') cb.assert_called_once_with('foo', 2) @@ -726,7 +697,8 @@ def test_send_with_ack_namespace(self, eio): s._handle_eio_message('123', '0/foo,') cb = mock.MagicMock() id = s.manager._generate_ack_id('1', cb) - s._emit_internal('123', 'my event', ['foo'], namespace='/foo', id=id) + s._send_packet('123', packet.Packet( + packet.EVENT, ['my event', 'foo'], namespace='/foo', id=id)) s._handle_eio_message('123', '3/foo,1["foo",2]') cb.assert_called_once_with('foo', 2) diff --git a/tests/performance/run.sh b/tests/performance/run.sh index 328e6ae1..ed78fc0d 100755 --- a/tests/performance/run.sh +++ b/tests/performance/run.sh @@ -5,3 +5,4 @@ python json_packet.py python namespace_packet.py python server_receive.py python server_send.py +python server_send_broadcast.py diff --git a/tests/performance/server_send.py b/tests/performance/server_send.py index 6c899118..b6b2d70e 100644 --- a/tests/performance/server_send.py +++ b/tests/performance/server_send.py @@ -6,6 +6,9 @@ class Server(socketio.Server): def _send_packet(self, eio_sid, pkt): pass + def _send_eio_packet(self, eio_sid, eio_pkt): + pass + def test(): s = Server() diff --git a/tests/performance/server_send_broadcast.py b/tests/performance/server_send_broadcast.py new file mode 100644 index 00000000..ce99d0a4 --- /dev/null +++ b/tests/performance/server_send_broadcast.py @@ -0,0 +1,30 @@ +import time +import socketio + + +class Server(socketio.Server): + def _send_packet(self, eio_sid, pkt): + pass + + def _send_eio_packet(self, eio_sid, eio_pkt): + pass + + +def test(): + s = Server() + start = time.time() + count = 0 + for i in range(100): + s._handle_eio_connect(str(i), 'environ') + s._handle_eio_message(str(i), '0') + while True: + s.emit('test', 'hello') + count += 1 + if time.time() - start >= 5: + break + return count + + +if __name__ == '__main__': + count = test() + print('server_send:', count, 'packets received.')