Skip to content

solve loading the massive texture and simplifing the simdata #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,7 @@ cython_debug/
#.idea/

# Mujoco
MUJOCO_LOG.TXT
MUJOCO_LOG.TXT

# vscode
.vscode/
86 changes: 86 additions & 0 deletions demos/robocasa/kitchen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import argparse
import json
import time
from collections import OrderedDict

import numpy as np
import robosuite
from robosuite.controllers import load_composite_controller_config
from robosuite.wrappers import VisualizationWrapper
from termcolor import colored

from robocasa.models.scenes.scene_registry import LayoutType, StyleType
from robocasa.scripts.collect_demos import collect_human_trajectory

from simpub.sim.robocasa_publisher import RobocasaPublisher

if __name__ == "__main__":
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="PnPCounterToCab", help="task")
parser.add_argument("--layout", type=int, help="kitchen layout (choose number 0-9)")
parser.add_argument("--style", type=int, help="kitchen style (choose number 0-11)")
parser.add_argument("--robot", type=str, help="robot", default="PandaOmron")
args = parser.parse_args()

raw_layouts = dict(
map(lambda item: (item.value, item.name.lower().capitalize()), LayoutType)
)
layouts = OrderedDict()
for k in sorted(raw_layouts.keys()):
if k < -0:
continue
layouts[k] = raw_layouts[k]

raw_styles = dict(
map(lambda item: (item.value, item.name.lower().capitalize()), StyleType)
)
styles = OrderedDict()
for k in sorted(raw_styles.keys()):
if k < 0:
continue
styles[k] = raw_styles[k]

# Create argument configuration
config = {
"env_name": args.task,
"robots": args.robot,
"controller_configs": load_composite_controller_config(robot=args.robot),
"translucent_robot": False,
}

args.renderer = "mjviewer"

print(colored("Initializing environment...", "yellow"))

env = robosuite.make(
**config,
has_renderer=True,
has_offscreen_renderer=False,
render_camera=None,
ignore_done=True,
use_camera_obs=False,
control_freq=20,
renderer=args.renderer,
)

# Grab reference to controller config and convert it to json-encoded string
env_info = json.dumps(config)

env.reset()

ep_meta = env.get_ep_meta()
# print(json.dumps(ep_meta, indent=4))
lang = ep_meta.get("lang", None)

# degugging: code block here to quickly test and close env
# env.close()
# return None, True
env.render()

publisher = RobocasaPublisher(env)

while True:
zero_action = np.zeros(env.action_dim)
obs, _, _, _ = env.step(zero_action)
env.render()
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

setup(
name='simpub',
version='0.1',
install_requires=["zmq", "trimesh", "pillow", "numpy", "scipy", "colorama"],
version='1.1',
install_requires=["zmq", "pillow", "numpy", "scipy", "colorama"],
include_package_data=True,
packages=['simpub', 'simpub.parser', 'simpub.sim']
)
93 changes: 56 additions & 37 deletions simpub/core/net_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class ServerPort(int, enum.Enum):

class ClientPort(int, enum.Enum):
DISCOVERY = 7720
SERVICE = 7723
TOPIC = 7724
SERVICE = 7730
TOPIC = 7731


class HostInfo(TypedDict):
Expand All @@ -43,6 +43,19 @@ class HostInfo(TypedDict):
services: List[ServiceName]


class SimPubClient:

def __init__(self, client_info: HostInfo) -> None:
self.manager: NetManager = NetManager.manager
self.info = client_info
self.req_socket: zmq.Socket = self.manager.zmq_context.socket(zmq.REQ)
self.sub_socket: zmq.Socket = self.manager.zmq_context.socket(zmq.SUB)
self.req_socket.connect(
f"tcp://{client_info['ip']}:{ClientPort.SERVICE.value}")
self.sub_socket.connect(
f"tcp://{client_info['ip']}:{ClientPort.TOPIC.value}")


class Communicator(abc.ABC):

def __init__(self):
Expand All @@ -65,7 +78,10 @@ def __init__(self, topic: str):
super().__init__()
self.topic = topic
self.socket = self.manager.pub_socket
self.manager.register_local_topic(self.topic)
if topic in self.manager.local_info["topics"]:
logger.warning(f"Host {topic} is already registered")
else:
self.manager.local_info["topics"].append(topic)
logger.info(f"Publisher for topic \"{self.topic}\" is ready")

def publish(self, data: Dict):
Expand Down Expand Up @@ -110,28 +126,16 @@ async def update_loop(self):
await self.socket.send_string(f"{self.topic}:{dumps(msg)}")


ResponseType = Union[str, bytes, Dict]


class Service(Communicator):

def __init__(
self,
service_name: str,
callback: Callable[[str], ResponseType],
respnse_type: ResponseType,
callback: Callable[[str], Union[str, bytes, Dict]],
) -> None:
super().__init__()
self.service_name = service_name
self.callback_func = callback
if respnse_type == str:
self.sender = self.send_string
elif respnse_type == bytes:
self.sender = self.send_bytes
elif respnse_type == Dict:
self.sender = self.send_dict
else:
raise ValueError("Invalid response type")
self.socket = self.manager.service_socket
# register service
self.manager.local_info["services"].append(service_name)
Expand All @@ -145,19 +149,32 @@ async def callback(self, msg: str):
),
timeout=5.0,
)
await self.sender(result)
await self.send(result)

@abc.abstractmethod
async def send(self, string: str):
raise NotImplementedError

def on_shutdown(self):
return super().on_shutdown()


class StringService(Service):

async def send_string(self, string: str):
async def send(self, string: str):
await self.socket.send_string(string)

async def send_bytes(self, data: bytes):

class BytesService(Service):

async def send(self, data: bytes):
await self.socket.send(data)

async def send_dict(self, data: Dict):
await self.socket.send_string(dumps(data))

def on_shutdown(self):
return super().on_shutdown()
class DictService(Service):

async def send(self, data: Dict):
await self.socket.send_string(dumps(data))


class NetManager:
Expand Down Expand Up @@ -187,8 +204,11 @@ def __init__(
self.local_info["ip"] = host_ip
self.local_info["topics"] = []
self.local_info["services"] = []
# host info
self.clients_info: Dict[str, HostInfo] = {}
# client info
self.on_client_registered: List[
Callable[[HostInfo], None]
] = list()
self.clients: Dict[IPAddress, SimPubClient] = {}
# setting up thread pool
self.running: bool = True
self.loop: asyncio.AbstractEventLoop = None
Expand All @@ -208,11 +228,11 @@ def start_event_loop(self):
while not self._initialized:
time.sleep(0.01)
# default service for client registration
self.register_service = Service(
"Register", self.register_client_callback, str
self.register_service = StringService(
"Register", self.register_client_callback
)
self.server_timestamp_service = Service(
"GetServerTimestamp", self.get_server_timestamp_callback, str
self.server_timestamp_service = StringService(
"GetServerTimestamp", self.get_server_timestamp_callback
)
# default task for client registration
self.submit_task(self.broadcast_loop)
Expand Down Expand Up @@ -280,21 +300,20 @@ async def broadcast_loop(self):
def register_client_callback(self, msg: str) -> str:
# NOTE: something woring with sending message, but it solved somehow
client_info: HostInfo = json.loads(msg)
client_name = client_info["name"]
# NOTE: the client info may be updated so the reference cannot be used
# NOTE: TypeDict is somehow block if the key is not in the dict
self.clients_info[client_name] = client_info
logger.info(f"Host \"{client_name}\" has been registered")
if client_info["ip"] not in self.clients:
self.clients[client_info['ip']] = SimPubClient(client_info)
for callback in self.on_client_registered:
callback(self.clients[client_info['ip']])
logger.info(
f"Host \"{client_info['name']}\" with"
f"IP \"{client_info['ip']}\" has been registered")
return "The info has been registered"

def get_server_timestamp_callback(self, msg: str) -> str:
return str(time.monotonic())

def register_local_topic(self, topic: TopicName):
if topic in self.local_info["topics"]:
logger.warning(f"Host {topic} is already registered")
self.local_info["topics"].append(topic)

def shutdown(self):
logger.info("Shutting down the server")
self.pub_socket.close(0)
Expand Down
43 changes: 30 additions & 13 deletions simpub/core/simpub_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from typing import Dict, List

from simpub.simdata import SimScene
from .net_manager import init_net_manager
from .net_manager import Streamer, Service
from .net_manager import init_net_manager, asycnc_sleep
from .net_manager import Streamer, BytesService, SimPubClient
from .log import logger
from .utils import send_message


class ServerBase(abc.ABC):
Expand Down Expand Up @@ -49,20 +50,36 @@ def __init__(
else:
self.no_tracked_objects = no_tracked_objects
super().__init__(host)
self.net_manager.on_client_registered.append(
self.on_xr_client_registered
)

def initialize(self):
self.scene_update_streamer = Streamer("SceneUpdate", self.get_update)
self.scene_service = Service("Scene", self._on_scene_request, str)
self.asset_service = Service("Asset", self._on_asset_request, bytes)

def _on_scene_request(self, req: str) -> str:
return self.sim_scene.to_string()

def _on_asset_request(self, tag: str) -> bytes:
if tag not in self.sim_scene.raw_data:
logger.warning("Received invalid data request")
return
return self.sim_scene.raw_data[tag]
# self.scene_service = BytesService("Scene", self._on_scene_request)
self.asset_service = BytesService("Asset", self._on_asset_request)

def on_xr_client_registered(self, client: SimPubClient):
if "LoadSimScene" in client.info["services"]:
scene_string = f"LoadSimScene:{self.sim_scene.to_string()}"
self.net_manager.submit_task(
send_message, scene_string, client.req_socket
)
# try:
# result = future.result()
# return result
# except Exception as e:
# logger.error(
# f"Find a new error when waiting for a response: {e}"
# )

def _on_asset_request(self, req: str) -> bytes:
return self.sim_scene.raw_data[req]

async def check_and_send_scene_update(self):
for client in self.net_manager.clients.values():
await client.req_socket.send_string("LoadSimScene:")
await asycnc_sleep(0.05)

@abc.abstractmethod
def get_update(self) -> Dict:
Expand Down
13 changes: 13 additions & 0 deletions simpub/core/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import zmq
from .log import logger


async def send_message(msg: str, socket: zmq.Socket):
try:
await socket.send_string(msg)
except Exception as e:
logger.error(
f"Error when sending message from send_message function in "
f"simpub.core.utils: {e}"
)
return await socket.recv_string()
Loading