Skip to content

Meta quest callbacks, vibration and publish async #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions simpub/core/net_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ def __init__(self, topic: str):

def publish(self, data: Dict):
msg = f"{self.topic}:{dumps(data)}"
self.socket.send_string(msg)
self.manager.submit_task(self.send_msg_async, msg)

def publish_string(self, string: str):
self.socket.send_string(f"{self.topic}:{string}")
self.manager.submit_task(self.send_msg_async, f"{self.topic}:{string}")

def on_shutdown(self):
super().on_shutdown()

async def send_msg_async(self, msg: str):
await self.socket.send_string(msg)


class Streamer(Publisher):
def __init__(
Expand All @@ -104,7 +107,7 @@ async def update_loop(self):
"updateData": self.update_func(),
"time": last,
}
self.socket.send_string(f"{self.topic}:{dumps(msg)}")
await self.socket.send_string(f"{self.topic}:{dumps(msg)}")


ResponseType = Union[str, bytes, Dict]
Expand Down Expand Up @@ -235,9 +238,9 @@ async def service_loop(self):
while self.running:
message = await self.service_socket.recv_string()
if ":" not in message:
logger.error(f"Invalid message with no spliter \":\"")
logger.error("Invalid message with no spliter \":\"")
await self.service_socket.send_string("Invalid message")
continue
continue
service, request = message.split(":", 1)
# the zmq service socket is blocked and only run one at a time
if service in self.service_list.keys():
Expand Down
38 changes: 30 additions & 8 deletions simpub/xr_device/meta_quest3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TypedDict, Callable, Dict
from typing import TypedDict, Callable, Dict, List
import json
from asyncio import sleep as async_sleep

from simpub.core.net_manager import Publisher
from .xr_device import XRDevice
Expand Down Expand Up @@ -38,25 +39,46 @@ def __init__(
self.input_subscriber = self.register_topic_callback(
f"{device_name}/InputData", self.update
)
self.viborate_publisher = Publisher(f"{device_name}/Vibration")
self.button_trigger_event: Dict[str, Callable] = {}
self.start_vib_pub = Publisher(f"{device_name}/StartVibration")
self.stop_vib_pub = Publisher(f"{device_name}/StopVibration")
self.button_trigger_event: Dict[str, List[Callable]] = {
"A": [],
"B": [],
"X": [],
"Y": [],
}
self.on_vibration = {"left": False, "right": False,}

def update(self, data: str):
self.last_input_data = self.input_data
self.input_data = json.loads(data)
if self.last_input_data is None:
return
for button, callback in self.button_trigger_event.items():
for button, callbacks in self.button_trigger_event.items():
if self.input_data[button] and not self.last_input_data[button]:
callback()
[callback() for callback in callbacks]

def register_button_trigger_event(self, button: str, callback: Callable):
# button should be one of A, B, X, Y
self.button_trigger_event[button] = callback
self.button_trigger_event[button].append(callback)

def get_input_data(self) -> MetaQuest3InputData:
return self.input_data

# TODO: Vibration Data Structure
def publish_vibrate(self, hand: str = "right"):
self.viborate_publisher.publish_string(hand)
def start_vibration(self, hand: str = "right", duration=0.5):
if not self.on_vibration[hand]:
self.on_vibration[hand] = True
self.manager.submit_task(
self.start_vibration_async, hand, duration,
)

async def start_vibration_async(self, hand: str = "right", duration=0.5):
self.start_vib_pub.publish_string(hand)
await async_sleep(duration)
self.stop_vib_pub.publish_string(hand)

def stop_vibration(self, hand: str = "right"):
if self.on_vibration[hand]:
self.on_vibration[hand] = False
self.stop_vib_pub.publish_string(hand)