Skip to content

Feat/asyncio #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions demos/PointCloudLabeling/point_cloud_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from simpub.core.net_manager import init_net_manager
from simpub.xr_device.xr_device import XRDevice


def generate_point_cloud_data(rgb_image, depth_image):
positions = []
colors = []
Expand Down
4 changes: 2 additions & 2 deletions demos/SimulationFramework/pick_and_place.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from alr_sim.sims.universal_sim.PrimitiveObjects import Box
from simpub.sim.sf_publisher import SFPublisher
from simpub.xr_device.xr_device import XRDevice
from simpub.xr_device.meta_quest3 import MetaQuest3

if __name__ == "__main__":

Expand Down Expand Up @@ -131,4 +130,5 @@
home_position, home_orientation, duration=duration
)


while True:
scene.next_step()
231 changes: 174 additions & 57 deletions simpub/core/net_manager.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import abc
import enum
from typing import List, Dict, NewType, Callable, TypedDict
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Dict
from typing import NewType, Callable, TypedDict, Union
import asyncio
from asyncio import sleep as asycnc_sleep
import threading
import zmq
import zmq.asyncio
import socket
from socket import AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
import struct
import time
from time import sleep
import json
import uuid
from .log import logger
from json import dumps
import zmq
import abc

IPAddress = NewType("IPAddress", str)
Topic = NewType("Topic", str)
Service = NewType("Service", str)
TopicName = NewType("TopicName", str)
ServiceName = NewType("ServiceName", str)


class ServerPort(int, enum.Enum):
Expand All @@ -32,11 +37,11 @@ class ClientPort(int, enum.Enum):
class HostInfo(TypedDict):
name: str
ip: IPAddress
topics: List[Topic]
services: List[Service]
topics: List[TopicName]
services: List[ServiceName]


class ConnectionAbstract(abc.ABC):
class Communicator(abc.ABC):

def __init__(self):
self.running: bool = False
Expand All @@ -53,6 +58,101 @@ def on_shutdown(self):
raise NotImplementedError


class Publisher(Communicator):
def __init__(self, topic: str):
super().__init__()
self.topic = topic
self.socket = self.manager.pub_socket
self.manager.register_local_topic(self.topic)
logger.info(f"Publisher for topic \"{self.topic}\" is ready")

def publish(self, data: Dict):
msg = f"{self.topic}:{dumps(data)}"
self.socket.send_string(msg)

def publish_string(self, string: str):
self.socket.send_string(f"{self.topic}:{string}")

def on_shutdown(self):
super().on_shutdown()


class Streamer(Publisher):
def __init__(
self,
topic: str,
update_func: Callable[[], Dict],
fps: int = 45,
):
super().__init__(topic)
self.running = False
self.dt: float = 1 / fps
self.update_func = update_func
self.manager.submit_task(self.update_loop)

async def update_loop(self):
self.running = True
last = 0.0
while self.running:
diff = time.monotonic() - last
if diff < self.dt:
await asycnc_sleep(self.dt - diff)
last = time.monotonic()
msg = {
"updateData": self.update_func(),
"time": last,
}
self.socket.send_string(f"{self.topic}:{dumps(msg)}")


ResponseType = Union[str, bytes, Dict]


class Service(Communicator):

def __init__(
self,
service_name: str,
callback: Callable[[str], ResponseType],
respnse_type: ResponseType,
) -> None:
super().__init__()
self.service_name = service_name
self.callback_func = callback
if respnse_type == str:
self.sender = self.send_string
elif respnse_type == bytes:
self.sender = self.send_bytes
elif respnse_type == Dict:
self.sender = self.send_dict
else:
raise ValueError("Invalid response type")
self.socket = self.manager.service_socket
# register service
self.manager.local_info["services"].append(service_name)
self.manager.service_list[service_name] = self
logger.info(f"\"{self.service_name}\" Service is ready")

async def callback(self, msg: str):
result = await asyncio.wait_for(
asyncio.to_thread(self.callback_func, msg),
timeout=5
)
await self.sender(result)

async def send_string(self, string: str):
await self.socket.send_string(string)

async def send_bytes(self, data: bytes):
await self.socket.send(data)

async def send_dict(self, data: Dict):
await self.socket.send_string(dumps(data))

def on_shutdown(self):
return super().on_shutdown()


class NetManager:

manager = None
Expand All @@ -64,17 +164,16 @@ def __init__(
) -> None:
NetManager.manager = self
self._initialized = True
self.zmq_context = zmq.Context()
self.zmq_context = zmq.asyncio.Context()
# subscriber
self.sub_socket_dict: Dict[IPAddress, zmq.Socket] = {}
self.topic_callback: Dict[Topic, Callable] = {}
# publisher
self.pub_socket = self.zmq_context.socket(zmq.PUB)
self.pub_socket.bind(f"tcp://{host_ip}:{ServerPort.TOPIC}")
# service
self.service_socket = self.zmq_context.socket(zmq.REP)
self.service_socket.bind(f"tcp://{host_ip}:{ServerPort.SERVICE}")
self.service_callback: Dict[str, Callable] = {}
self.service_list: Dict[str, Service] = {}
# message for broadcasting
self.local_info = HostInfo()
self.local_info["host"] = host_name
Expand All @@ -85,45 +184,69 @@ def __init__(
self.clients_info: Dict[str, HostInfo] = {}
# setting up thread pool
self.running: bool = True
self.executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=10)
self.futures: List[Future] = []
self.loop: asyncio.AbstractEventLoop = None
self.start_server_thread()

def start_server_thread(self) -> None:
"""
Start a thread for service.

Args:
block (bool, optional): main thread stop running and
wait for server thread. Defaults to False.
"""
self.server_thread = threading.Thread(target=self.start_event_loop)
self.server_thread.daemon = True
self.server_thread.start()
while self.loop is None:
time.sleep(0.01)

def start_event_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# default task for client registration
self.submit_task(self.broadcast_loop)
self.submit_task(self.service_loop)
self.loop.run_forever()

def submit_task(self, task: Callable, *args):
# NOTE: It wouldn't stop even if any thread throws an exception
future = self.executor.submit(task, *args)
self.futures.append(future)
asyncio.run_coroutine_threadsafe(task(*args), self.loop)

def stop_server(self):
if self.loop.is_running():
asyncio.run_coroutine_threadsafe(self.loop.stop(), self.loop)
self.server_thread.join()

def join(self):
for future in self.futures:
future.result()
self.executor.shutdown()

def service_loop(self):
try:
logger.info("The service is running...")
# default service for client registration
self.register_local_service(
"Register", self.register_client_callback
)
self.register_local_service(
"GetServerTimestamp", self.get_server_timestamp_callback
)
while self.running:
message = self.service_socket.recv_string()
service, request = message.split(":", 1)
if service in self.service_callback.keys():
# the zmq service socket is blocked and only run one at a time
self.service_callback[service](request, self.service_socket)
else:
self.service_socket.send_string("Invild Service")
except Exception as e:
logger.error(f"Service Loop from Net Manager throw an exception of {e}")
finally:
logger.info("Service has been stopped")

def broadcast_loop(self):
self.server_thread.join()

async def service_loop(self):
# try:
logger.info("The service is running...")
# default service for client registration
self.register_service = Service(
"Register", self.register_client_callback, str
)
self.server_timestamp_service = Service(
"GetServerTimestamp", self.get_server_timestamp_callback, str
)
while self.running:
message = await self.service_socket.recv_string()
service, request = message.split(":", 1)
# the zmq service socket is blocked and only run one at a time
if service in self.service_list.keys():
try:
await self.service_list[service].callback(request)
except asyncio.TimeoutError:
logger.error(
"Timeout: callback function took too long to execute"
)
await self.service_socket.send_string("Timeout")
except Exception as e:
logger.error(f"Error: {e}")
await asycnc_sleep(0.01)

async def broadcast_loop(self):
logger.info("The server is broadcasting...")
# set up udp socket
_socket = socket.socket(AF_INET, SOCK_DGRAM)
Expand All @@ -138,33 +261,27 @@ def broadcast_loop(self):
while self.running:
msg = f"SimPub:{_id}:{json.dumps(local_info)}"
_socket.sendto(msg.encode(), (broadcast_ip, ServerPort.DISCOVERY))
sleep(0.5)
await asycnc_sleep(0.1)
logger.info("Broadcasting has been stopped")

def register_client_callback(self, msg: str, socket: zmq.Socket):
def register_client_callback(self, msg: str) -> str:
# NOTE: something woring with sending message, but it solved somehow
socket.send_string("The info has been registered")
client_info: HostInfo = json.loads(msg)
client_name = client_info["name"]
# NOTE: the client info may be updated so the reference cannot be used
# NOTE: TypeDict is somehow block if the key is not in the dict
self.clients_info[client_name] = client_info
logger.info(f"Host \"{client_name}\" has been registered")
return "The info has been registered"

def get_server_timestamp_callback(self, msg: str, socket: zmq.Socket):
socket.send_string(str(time.monotonic()))
def get_server_timestamp_callback(self, msg: str) -> str:
return str(time.monotonic())

def register_local_topic(self, topic: Topic):
def register_local_topic(self, topic: TopicName):
if topic in self.local_info["topics"]:
logger.warning(f"Host {topic} is already registered")
self.local_info["topics"].append(topic)

def register_local_service(
self, service: str, callback: Callable
) -> None:
self.local_info["services"].append(service)
self.service_callback[service] = callback

def shutdown(self):
logger.info("Shutting down the server")
self.pub_socket.close(0)
Expand Down
53 changes: 0 additions & 53 deletions simpub/core/publisher.py

This file was deleted.

Loading