Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ros2 keyboard fixed #134

Merged
merged 12 commits into from
Nov 2, 2022
1 change: 1 addition & 0 deletions jupyros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .ros2.publisher import *
from .ros2.ros_widgets import *
from .ros2.subscriber import *
from .ros2.key_input import *

else:
# Default to ROS1
Expand Down
1 change: 1 addition & 0 deletions jupyros/ros2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from ..ros2.publisher import *
from ..ros2.ros_widgets import *
from ..ros2.subscriber import *
from ..ros2.key_input import *
107 changes: 107 additions & 0 deletions jupyros/ros2/key_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
## Keyboard input for Jupyter Ros 2
"""
Author: Luigi Dania
Email: Luigi@dobots.nl
Github: https://github.com/ldania

Company: Dobots
Company Repo: https://github.com/dobots/


"""

from ipywidgets import Output
from ipycanvas import Canvas


# Method to receive keyboard input commands and send a String message to a Subscriber
# It is recommended to use a secondary node to translate your String message to an appropriate msg type


class KeyInput:

# Initiate values
def __init__(self, node, msg_type, topic, key_bindings=None):
# Set default window size and colors
self.width = 400
self.height = 400
self.color = "#1E3888"
self.font_color = "#E3DFFF"

# Initiate values for ipycanvas to
self.canvas = Canvas()
self.canvas.fill_style = self.color
self.canvas.fill_rect(0, 0, self.width, self.height)

self.out = Output()
self.msg_inst = msg_type()
self.node = node
self.__publisher = self.node.create_publisher(msg_type, topic, 10)

self.print_outgoing_msg = True
self.canvas.text_align = "center"
self.key_bindings = key_bindings
self.smallest_size = min(self.width, self.height)

# Method to change the window color
def set_color(self, color):
self.color = color
self.canvas.fill_style = self.color
self.update()

# Method to change the window width
def set_width(self, width):
self.width = width
self.smallest_size = min(self.width, self.height)
self.update()

# Method to change the window height
def set_height(self, height):
self.height = height
self.smallest_size = min(self.width, self.height)
self.update()

def print_outgoing(self, var: bool):
self.print_outgoing_msg = var

def update(self):
self.canvas.clear()
self.canvas.fill_rect(0, 0, self.width, self.height)

# Method to display the screen and to receive keyboard inputs
def display_inputs(self):
self.update()

@self.out.capture()
def on_keyboard_event(key, shift_key, ctrl_key, meta_key):
if key:
if self.print_outgoing_msg:
self.canvas.fill_rect(0, 0, self.width, self.height)
if str(key) == "ArrowRight":
print_key = "⇒"
elif str(key) == "ArrowDown":
print_key = "⇓"
elif str(key) == "ArrowLeft":
print_key = "⇐"
elif str(key) == "ArrowUp":
print_key = "⇑"
else:
print_key = key

if len(str(print_key)) > 2:
factor = 5.5
else:
factor = 3

self.canvas.fill_style = self.font_color
self.font_size = self.smallest_size/factor
self.canvas.font = "{}px sans-serif".format(self.font_size)
self.canvas.fill_text(print_key, self.width/2, self.height/2+self.font_size/3)
self.canvas.fill_style = self.color

self.msg_inst.data = str(key)
self.__publisher.publish(self.msg_inst)

self.canvas.on_key_down(on_keyboard_event)
display(self.canvas)
display(self.out)
54 changes: 28 additions & 26 deletions jupyros/ros2/publisher.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
"""
Publisher class for jupyter-ros2 Project

Modified by: Luigi Dania
Email: Luigi@dobots.nl
Github: https://github.com/ldania

Company: Dobots
Company Repo: https://github.com/dobots/




Original Author: zmk5 (Zahi Kakish)

Adjusted by: ldania (Luigi Dania)
Date: 19 July 2022


"""
from typing import TypeVar
import threading
import time
import ipywidgets as widgets
from .ros_widgets import add_widgets
from .ros_widgets import add_widgets, rsetattr, rgetattr
import functools

def rsetattr(obj, attr, val):
pre, _, post = attr.rpartition('.')
return setattr(rgetattr(obj, pre) if pre else obj, post, val)

# using wonder's beautiful simplification: https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects/31174427?noredirect=1#comment86638618_31174427

def rgetattr(obj, attr, *args):
def _getattr(obj, attr):
return getattr(obj, attr, *args)
return functools.reduce(_getattr, [obj] + attr.split('.'))


try:
Expand Down Expand Up @@ -54,7 +54,7 @@ class Publisher():
:param topic: The topic name on which to publish the message.

"""
def __init__(self, node: Node, msg_type: MsgType, topic: str) -> None:
def __init__(self, node: Node, msg_type: MsgType, topic: str, rate = None ) -> None:
# Check if a ros2 node is provided.
if (not isinstance(node, Node) or not issubclass(type(node), Node)):
raise TypeError(
Expand All @@ -77,6 +77,7 @@ def __init__(self, node: Node, msg_type: MsgType, topic: str) -> None:
self.msg_type = msg_type
self.__publisher = self.node.create_publisher(msg_type, topic, 10)
self.__thread_map = {}
self.__thread_map[self.topic] = False
self.__widget_list = []
self.__widget_dict = {}
self.__widgets = {
Expand All @@ -85,7 +86,8 @@ def __init__(self, node: Node, msg_type: MsgType, topic: str) -> None:
"send_btn": widgets.Button(description="Send Message"),
"txt_input": widgets.Text(description="Message", value="Something")
}

if(rate):
self.node.create_timer(rate, self.__send_msg)
self.widget_dict, self.widget_list = add_widgets(self.msg_type, self.__widget_dict, self.__widget_list)

def widget_dict_to_msg(self):
Expand All @@ -107,10 +109,6 @@ def widget_dict_to_msg(self):
head_class = key.value


#submsg = getattr(msg_instance, key)
#self._sub_msg[key] =
#widget_dict_to_msg(submsg, d[key])



def display(self) -> widgets.VBox:
Expand All @@ -131,18 +129,25 @@ def display(self) -> widgets.VBox:

return vbox

def send_msg(self, args):
"""Call to send message directly"""
self._send_msg(args)


def __send_msg(self, args):


""" Generic call to send message. """
self.msg_inst = self.msg_type()
self.widget_dict_to_msg()
self.__publisher.publish(self.msg_inst)
#self.__publisher.publish()

if(self.widget_list):
self.widget_dict_to_msg()
self.__publisher.publish(self.msg_inst)
else:
self.__publisher.publish(args)

def send_msg(self, args):
"""Call to send message directly"""
self.__send_msg(args)

def __thread_target(self) -> None:
d = 1.0 / float(self.__widgets["rate_field"].value)
Expand All @@ -151,10 +156,7 @@ def __thread_target(self) -> None:
time.sleep(d)

def __start_thread(self, _) -> None:
try:
self.__thread_map[self.topic] = not self.__thread_map[self.topic]
except:
self.__thread_map[self.topic] = self.node
self.__thread_map[self.topic] = not self.__thread_map[self.topic]
if self.__thread_map[self.topic]:
local_thread = threading.Thread(target=self.__thread_target)
local_thread.start()
Expand Down
31 changes: 26 additions & 5 deletions jupyros/ros2/ros_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import ipywidgets as widgets
from ament_index_python.packages import get_package_share_directory
import rosidl_runtime_py.utilities as rut

import functools



Expand Down Expand Up @@ -44,18 +44,28 @@ def add_widgets(msg_instance, widget_dict, widget_list, prefix=''):
msg_inst = msg_instance()
except:
msg_inst = msg_instance

attr = getattr(msg_inst, slot)


#s_t = msg_instance._fields_and_field_types[slot]

try:
## Determine if the Message is the a basic form of Messages such as Point, String etc
if(type(attr) in [int, str, float]):
msg_attr = msg_inst.get_fields_and_field_types()
basic_flag = 1
if(idx != 0):
continue

else:
msg_attr = attr.get_fields_and_field_types()
except:
next
basic_flag = 0


w = None

if(rut.is_message(msg_instance)):
if(basic_flag == 1):
slot = type(msg_inst).__name__
widget_list.append(widgets.Label(value=slot))

widget_dict[slot] = {}
Expand Down Expand Up @@ -96,3 +106,14 @@ def add_widgets(msg_instance, widget_dict, widget_list, prefix=''):
else:
print("No inst")
"""

def rsetattr(obj, attr, val):
pre, _, post = attr.rpartition('.')
return setattr(rgetattr(obj, pre) if pre else obj, post, val)

# using wonder's beautiful simplification: https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects/31174427?noredirect=1#comment86638618_31174427

def rgetattr(obj, attr, *args):
def _getattr(obj, attr):
return getattr(obj, attr, *args)
return functools.reduce(_getattr, [obj] + attr.split('.'))
34 changes: 26 additions & 8 deletions jupyros/ros2/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
"""
Subscription class for jupyter-ros2 Project
Subscriber class for jupyter-ros2 Project

Modified by: Luigi Dania
Email: Luigi@dobots.nl
Github: https://github.com/ldania

Company: Dobots
Company Repo: https://github.com/dobots/


Modified by: ldania (Luigi Dania)
Date: 22-July-2022


Original Author: zmk5 (Zahi Kakish)
Expand All @@ -11,6 +17,7 @@
"""
from typing import TypeVar
from typing import Callable
import time
import threading
import functools
import ipywidgets as widgets
Expand Down Expand Up @@ -72,6 +79,18 @@ def __init__(self, node: Node, msg_type: MsgType, topic: str,
self.msg_type = msg_type
self.data = None
self.__thread_state = False
self.callback= callback




"""
Different than Ros1, in Ros2 an Multithreadedexecutor is needed for multiple silmultaneous nodes
"""
self.executor = rclpy.executors.MultiThreadedExecutor()
self.executor.add_node(self.node)
self.rate = self.node.create_rate(2)

self.__widgets = {
"start_btn": widgets.Button(description='Start'),
"stop_btn": widgets.Button(description='Stop'),
Expand Down Expand Up @@ -102,18 +121,17 @@ def display(self) -> widgets.VBox:
return vbox


def __thread_target(self) -> None:
while self.__thread_state:
rclpy.spin_once(self.node, timeout_sec=1)
self.__widgets["out"].append_stdout("Done!\n")

def _stop_subscription(self, _) -> None:
self.__thread_state = False

def _start_subscription(self, _) -> None:
self.__thread_state = True
local_thread = threading.Thread(target=self.__thread_target)

print("started")
local_thread = threading.Thread( target = self.executor.spin, daemon = True)
local_thread.start()
self.rate.sleep()

def __data_msg(self, func: Callable) -> Callable:
"""
Expand Down
Loading