Skip to content

Refactor server.py to use Asyncio and Non-blocking I/O for improved scalability #227

@from2001

Description

@from2001

Problem Description

Currently, server.py utilizes the threading module combined with blocking ZeroMQ calls. While this works for a moderate number of clients, it faces scalability challenges due to Python's GIL (Global Interpreter Lock) and the overhead of OS-level context switching.

In high-load LBE (Location-Based Entertainment) scenarios—such as 100+ clients sending transforms at 30Hz—this architecture can lead to:

  • Increased CPU usage due to thread contention.
  • Latency (lag) caused by lock competition (_rooms_lock).
  • Difficulty in integrating with modern async-native libraries (e.g., the FastAPI REST bridge currently runs in a separate thread).

Proposed Solution

Migrate the server architecture to an Asyncio-based event loop model using zmq.asyncio (native async support in pyzmq).

By moving to a single-threaded, non-blocking I/O model, we can handle thousands of concurrent connections efficiently, similar to Node.js or Nginx architectures.

Implementation Plan

  • Replace Threading with Asyncio: Convert NetSyncServer to run on an asyncio event loop instead of spawning receive_thread and periodic_thread.
  • Adopt Non-blocking ZMQ: Switch from standard zmq.Context to zmq.asyncio.Context and use await for socket operations.
  • Refactor Main Loops:
    • Convert _receive_loop to an async def coroutine.
    • Convert _periodic_loop (cleanup/broadcasting) to an async def coroutine using asyncio.sleep().
  • Simplify Locking: Reduce or eliminate _rooms_lock usage, as the single-threaded event loop naturally prevents many race conditions.
  • Integrate REST Bridge: Run the FastAPI application within the same event loop (removing the need for a separate thread for rest_bridge.py).

Code Example (Concept)

Current (Blocking):

def _receive_loop(self):
    while self.running:
        if self.router.poll(100):
            msg = self.router.recv_multipart()
            self.process_message(msg)

Proposed (Async):

async def _receive_loop(self):
    while True:
        # Non-blocking await
        msg = await self.router.recv_multipart()
        # Process message as a task
        asyncio.create_task(self.process_message(msg))

Benefits

  • High Scalability: Efficiently handles C10K-level concurrency.
  • Reduced Latency: Minimizes overhead from thread switching.
  • Code Simplicity: Reduces the complexity of thread synchronization and locking.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions