Skip to content

Commit

Permalink
Support entering and leaving rooms through pubsub client managers
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Sep 19, 2023
1 parent 3f78af2 commit d222f4c
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 0 deletions.
35 changes: 35 additions & 0 deletions src/socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ async def disconnect(self, sid, namespace, **kwargs):
await self._handle_disconnect(message) # handle in this host
await self._publish(message) # notify other hosts

async def enter_room(self, sid, namespace, room, eio_sid=None):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return await super().enter_room(sid, namespace, room,
eio_sid=eio_sid)
else:
message = {'method': 'enter_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._publish(message) # notify other hosts

async def leave_room(self, sid, namespace, room):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return await super().leave_room(sid, namespace, room)
else:
message = {'method': 'leave_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._publish(message) # notify other hosts

async def close_room(self, room, namespace=None):
message = {'method': 'close_room', 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
Expand Down Expand Up @@ -158,6 +177,18 @@ async def _handle_disconnect(self, message):
namespace=message.get('namespace'),
ignore_queue=True)

async def _handle_enter_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
await super().enter_room(sid, namespace, message.get('room'))

async def _handle_leave_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
await super().leave_room(sid, namespace, message.get('room'))

async def _handle_close_room(self, message):
await super().close_room(room=message.get('room'),
namespace=message.get('namespace'))
Expand Down Expand Up @@ -191,6 +222,10 @@ async def _thread(self):
await self._handle_emit(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'enter_room':
await self._handle_enter_room(data)
elif data['method'] == 'leave_room':
await self._handle_leave_room(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
except asyncio.CancelledError:
Expand Down
34 changes: 34 additions & 0 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ def disconnect(self, sid, namespace=None, **kwargs):
self._handle_disconnect(message) # handle in this host
self._publish(message) # notify other hosts

def enter_room(self, sid, namespace, room, eio_sid=None):
if self.is_connected(sid, namespace):
# client is in this server, so we can add to the room directly
return super().enter_room(sid, namespace, room, eio_sid=eio_sid)
else:
message = {'method': 'enter_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
self._publish(message) # notify other hosts

def leave_room(self, sid, namespace, room):
if self.is_connected(sid, namespace):
# client is in this server, so we can remove from the room directly
return super().leave_room(sid, namespace, room)
else:
message = {'method': 'leave_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
self._publish(message) # notify other hosts

def close_room(self, room, namespace=None):
message = {'method': 'close_room', 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
Expand Down Expand Up @@ -153,6 +171,18 @@ def _handle_disconnect(self, message):
namespace=message.get('namespace'),
ignore_queue=True)

def _handle_enter_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
super().enter_room(sid, namespace, message.get('room'))

def _handle_leave_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
super().leave_room(sid, namespace, message.get('room'))

def _handle_close_room(self, message):
super().close_room(room=message.get('room'),
namespace=message.get('namespace'))
Expand Down Expand Up @@ -184,6 +214,10 @@ def _thread(self):
self._handle_emit(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'enter_room':
self._handle_enter_room(data)
elif data['method'] == 'leave_room':
self._handle_leave_room(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
except:
Expand Down
77 changes: 77 additions & 0 deletions tests/asyncio/test_asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,27 @@ def test_disconnect_ignore_queue(self):
self.pm._publish.mock.assert_not_called()
assert self.pm.is_connected(sid, '/') is False

def test_enter_room(self):
sid = self.pm.connect('123', '/')
_run(self.pm.enter_room(sid, '/', 'foo'))
_run(self.pm.enter_room('456', '/', 'foo'))
assert sid in self.pm.rooms['/']['foo']
assert self.pm.rooms['/']['foo'][sid] == '123'
self.pm._publish.mock.assert_called_once_with(
{'method': 'enter_room', 'sid': '456', 'room': 'foo',
'namespace': '/', 'host_id': '123456'}
)

def test_leave_room(self):
sid = self.pm.connect('123', '/')
_run(self.pm.leave_room(sid, '/', 'foo'))
_run(self.pm.leave_room('456', '/', 'foo'))
assert 'foo' not in self.pm.rooms['/']
self.pm._publish.mock.assert_called_once_with(
{'method': 'leave_room', 'sid': '456', 'room': 'foo',
'namespace': '/', 'host_id': '123456'}
)

def test_close_room(self):
_run(self.pm.close_room('foo'))
self.pm._publish.mock.assert_called_once_with(
Expand Down Expand Up @@ -413,6 +434,48 @@ def test_handle_disconnect(self):
sid='123', namespace='/foo', ignore_queue=True
)

def test_handle_enter_room(self):
sid = self.pm.connect('123', '/')
with mock.patch.object(
asyncio_manager.AsyncManager, 'enter_room', new=AsyncMock()
) as super_enter_room:
_run(
self.pm._handle_enter_room(
{'method': 'enter_room', 'sid': sid, 'namespace': '/',
'room': 'foo'}
)
)
_run(
self.pm._handle_enter_room(
{'method': 'enter_room', 'sid': '456', 'namespace': '/',
'room': 'foo'}
)
)
super_enter_room.mock.assert_called_once_with(
self.pm, sid, '/', 'foo'
)

def test_handle_leave_room(self):
sid = self.pm.connect('123', '/')
with mock.patch.object(
asyncio_manager.AsyncManager, 'leave_room', new=AsyncMock()
) as super_leave_room:
_run(
self.pm._handle_leave_room(
{'method': 'leave_room', 'sid': sid, 'namespace': '/',
'room': 'foo'}
)
)
_run(
self.pm._handle_leave_room(
{'method': 'leave_room', 'sid': '456', 'namespace': '/',
'room': 'foo'}
)
)
super_leave_room.mock.assert_called_once_with(
self.pm, sid, '/', 'foo'
)

def test_handle_close_room(self):
with mock.patch.object(
asyncio_manager.AsyncManager, 'close_room', new=AsyncMock()
Expand Down Expand Up @@ -447,6 +510,8 @@ def test_background_thread(self):
self.pm._handle_emit = AsyncMock()
self.pm._handle_callback = AsyncMock()
self.pm._handle_disconnect = AsyncMock()
self.pm._handle_enter_room = AsyncMock()
self.pm._handle_leave_room = AsyncMock()
self.pm._handle_close_room = AsyncMock()
host_id = self.pm.host_id

Expand All @@ -461,6 +526,10 @@ async def messages():
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'

Expand Down Expand Up @@ -490,6 +559,14 @@ async def messages():
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
)
self.pm._handle_enter_room.mock.assert_called_once_with(
{'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
)
self.pm._handle_leave_room.mock.assert_called_once_with(
{'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
)
self.pm._handle_close_room.mock.assert_called_once_with(
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
)
Expand Down
61 changes: 61 additions & 0 deletions tests/common/test_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ def test_disconnect_ignore_queue(self):
self.pm._publish.assert_not_called()
assert not self.pm.is_connected(sid, '/')

def test_enter_room(self):
sid = self.pm.connect('123', '/')
self.pm.enter_room(sid, '/', 'foo')
self.pm.enter_room('456', '/', 'foo')
assert sid in self.pm.rooms['/']['foo']
assert self.pm.rooms['/']['foo'][sid] == '123'
self.pm._publish.assert_called_once_with(
{'method': 'enter_room', 'sid': '456', 'room': 'foo',
'namespace': '/', 'host_id': '123456'}
)

def test_leave_room(self):
sid = self.pm.connect('123', '/')
self.pm.leave_room(sid, '/', 'foo')
self.pm.leave_room('456', '/', 'foo')
assert 'foo' not in self.pm.rooms['/']
self.pm._publish.assert_called_once_with(
{'method': 'leave_room', 'sid': '456', 'room': 'foo',
'namespace': '/', 'host_id': '123456'}
)

def test_close_room(self):
self.pm.close_room('foo')
self.pm._publish.assert_called_once_with(
Expand Down Expand Up @@ -373,6 +394,32 @@ def test_handle_disconnect(self):
sid='123', namespace='/foo', ignore_queue=True
)

def test_handle_enter_room(self):
sid = self.pm.connect('123', '/')
with mock.patch.object(
base_manager.BaseManager, 'enter_room'
) as super_enter_room:
self.pm._handle_enter_room({
'method': 'enter_room', 'sid': sid, 'namespace': '/',
'room': 'foo'})
self.pm._handle_enter_room({
'method': 'enter_room', 'sid': '456', 'namespace': '/',
'room': 'foo'})
super_enter_room.assert_called_once_with(sid, '/', 'foo')

def test_handle_leave_room(self):
sid = self.pm.connect('123', '/')
with mock.patch.object(
base_manager.BaseManager, 'leave_room'
) as super_leave_room:
self.pm._handle_leave_room({
'method': 'leave_room', 'sid': sid, 'namespace': '/',
'room': 'foo'})
self.pm._handle_leave_room({
'method': 'leave_room', 'sid': '456', 'namespace': '/',
'room': 'foo'})
super_leave_room.assert_called_once_with(sid, '/', 'foo')

def test_handle_close_room(self):
with mock.patch.object(
base_manager.BaseManager, 'close_room'
Expand All @@ -397,6 +444,8 @@ def test_background_thread(self):
self.pm._handle_emit = mock.MagicMock()
self.pm._handle_callback = mock.MagicMock()
self.pm._handle_disconnect = mock.MagicMock()
self.pm._handle_enter_room = mock.MagicMock()
self.pm._handle_leave_room = mock.MagicMock()
self.pm._handle_close_room = mock.MagicMock()
host_id = self.pm.host_id

Expand All @@ -411,6 +460,10 @@ def messages():
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'

Expand Down Expand Up @@ -442,6 +495,14 @@ def messages():
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
)
self.pm._handle_enter_room.assert_called_once_with(
{'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
)
self.pm._handle_leave_room.assert_called_once_with(
{'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
)
self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
)
Expand Down

0 comments on commit d222f4c

Please sign in to comment.