Skip to content

Commit

Permalink
Optimize memory usage during broadcasts (#1233)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg authored Sep 3, 2023
1 parent f49d65a commit bf11ad3
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 248 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find:
python_requires = >=3.6
install_requires =
bidict >= 0.21.0
python-engineio >= 4.3.0
python-engineio >= 4.7.0

[options.packages.find]
where = src
Expand Down
47 changes: 38 additions & 9 deletions src/socketio/asyncio_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

from engineio import packet as eio_packet
from socketio import packet
from .base_manager import BaseManager


Expand All @@ -17,18 +19,45 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None,
"""
if namespace not in self.rooms:
return
tasks = []
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
tasks = []
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
tasks.append(asyncio.create_task(
self.server._send_eio_packet(eio_sid, p)))
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
tasks.append(asyncio.create_task(
self.server._emit_internal(eio_sid, event, data,
namespace, id)))
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
tasks.append(asyncio.create_task(
self.server._send_packet(eio_sid, pkt)))
if tasks == []: # pragma: no cover
return
await asyncio.wait(tasks)
Expand Down
17 changes: 4 additions & 13 deletions src/socketio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,6 @@ async def sleep(self, seconds=0):
"""
return await self.eio.sleep(seconds)

async def _emit_internal(self, sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
await self._send_packet(sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))

async def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
Expand All @@ -446,6 +433,10 @@ async def _send_packet(self, eio_sid, pkt):
else:
await self.eio.send(eio_sid, encoded_packet)

async def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
await self.eio.send_packet(eio_sid, eio_pkt)

async def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
Expand Down
41 changes: 35 additions & 6 deletions src/socketio/base_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging

from bidict import bidict, ValueDuplicationError
from engineio import packet as eio_packet
from socketio import packet

default_logger = logging.getLogger('socketio')

Expand Down Expand Up @@ -161,15 +163,42 @@ def emit(self, event, data, namespace, room=None, skip_sid=None,
connected to the namespace."""
if namespace not in self.rooms:
return
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
self.server._send_eio_packet(eio_sid, p)
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
self.server._emit_internal(eio_sid, event, data, namespace, id)
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
self.server._send_packet(eio_sid, pkt)

def trigger_callback(self, sid, id, data):
"""Invoke an application callback."""
Expand Down
21 changes: 6 additions & 15 deletions src/socketio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def on(self, event, handler=None, namespace=None):
Example usage::
# as a decorator:
@socket_io.on('connect', namespace='/chat')
@sio.on('connect', namespace='/chat')
def connect_handler(sid, environ):
print('Connection request')
if environ['REMOTE_ADDR'] in blacklisted:
Expand All @@ -194,7 +194,7 @@ def connect_handler(sid, environ):
# as a method:
def message_handler(sid, msg):
print('Received message: ', msg)
eio.send(sid, 'response')
sio.send(sid, 'response')
socket_io.on('message', namespace='/chat', handler=message_handler)
The handler function receives the ``sid`` (session ID) for the
Expand Down Expand Up @@ -633,19 +633,6 @@ def sleep(self, seconds=0):
"""
return self.eio.sleep(seconds)

def _emit_internal(self, eio_sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
self._send_packet(eio_sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))

def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
Expand All @@ -655,6 +642,10 @@ def _send_packet(self, eio_sid, pkt):
else:
self.eio.send(eio_sid, encoded_packet)

def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
self.eio.send_packet(eio_sid, eio_pkt)

def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
Expand Down
Loading

0 comments on commit bf11ad3

Please sign in to comment.