Skip to content

Robust bug topic negotiation #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dmypy.json

# Pyre type checker
.pyre/
.vscode
145 changes: 79 additions & 66 deletions src/uros/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
uio: packet buffer
struct: serlization
_TopicInfo: for topic negotiation
already provided in rosserial_msgs
already provided in rosserial_msgs
"""

import sys
import logging
import machine as m
import uio
import ustruct as struct
from time import sleep, sleep_ms, sleep_us
from rosserial_msgs import TopicInfo
import sys
import os
import logging

# for now threads are used, will be changed with asyncio in the future
if sys.platform == "esp32":
Expand All @@ -29,17 +26,18 @@

# class to manage publish and subscribe
# COULD BE CHANGED AFTERWARDS
class NodeHandle(object):
def __init__(self, serial_id=2, baudrate=115200, **kwargs):
class NodeHandle:
"""Initiates connection through rosserial using UART ports."""

def __init__(self, serial_id=2, baudrate=115200, **kwargs):
"""
id: used for topics id (negotiation)
advertised_topics: manage already negotiated topics
subscribing_topics: topics to which will be subscribed are here
serial_id: uart id
baudrate: baudrate used for serial comm
"""
self.id = 101
id: used for topics id (negotiation)
advertised_topics: manage already negotiated topics
subscribing_topics: topics to which will be subscribed are here
serial_id: uart id
baudrate: baudrate used for serial comm
"""
self._id = 101
self.advertised_topics = dict()
self.subscribing_topics = dict()
self.serial_id = serial_id
Expand Down Expand Up @@ -69,28 +67,18 @@ def __init__(self, serial_id=2, baudrate=115200, **kwargs):

# method to manage and advertise topic
# before publishing or subscribing
def _advertise_topic(self, topic_name, msg, endpoint, buffer_size):

def _advertise_topic(self, topic_name, _id, msg, endpoint, buffer_size):
"""
topic_name: eg. (Greet)
msg: message object
endpoint: corresponds to TopicInfo.msg typical topic id values
"""
topic_name: eg. (Greet)
msg: message object
endpoint: corresponds to TopicInfo.msg typical topic id values
"""
register = TopicInfo()
register.topic_id = self.id
register.topic_id = _id
register.topic_name = topic_name
register.message_type = msg._type
register.md5sum = msg._md5sum

self.advertised_topics[topic_name] = self.id

# id are summed by one
self.id += 1

try:
register.buffer_size = buffer_size
except Exception as e:
logging.info("No buffer size could be defined for topic negotiation.")
register.buffer_size = buffer_size

# serialization
packet = uio.StringIO()
Expand All @@ -101,17 +89,31 @@ def _advertise_topic(self, topic_name, msg, endpoint, buffer_size):
length = len(packet)

# both checksums
crclen = [checksum(le(length))]
crcpack = [checksum(le(endpoint) + packet)]
crclen = [checksum(_le(length))]
crcpack = [checksum(_le(endpoint) + packet)]

# final packet to be sent
fpacket = header + le(length) + crclen + le(endpoint) + packet + crcpack
fpacket = header + _le(length) + crclen + _le(endpoint) + packet + crcpack
self.uart.write(bytearray(fpacket))

def _advertise_all_topics(self):
for key, value in self.advertised_topics.items():
self._advertise_topic(key, value[0], value[1], value[2], value[3])

def publish(self, topic_name, msg, buffer_size=1024):
"""publishes messages to topics

Args:
topic_name (string): name of destination topic in ROS network.
msg (ROS message): custom message object generated by ugenpy.
buffer_size (int, optional): maximum size of buffer for message. Defaults to 1024.
"""

if topic_name not in self.advertised_topics:
self._advertise_topic(topic_name, msg, 0, buffer_size)
self.advertised_topics[topic_name] = [self._id, msg, 0, buffer_size]
# id are summed by one
self._advertise_topic(topic_name, self._id, msg, 0, buffer_size)
self._id += 1

# same as advertise
packet = uio.StringIO()
Expand All @@ -120,23 +122,33 @@ def publish(self, topic_name, msg, buffer_size=1024):
packet = list(packet.getvalue().encode("utf-8"))
length = len(packet)

topic_id = le(self.advertised_topics.get(topic_name))
crclen = [checksum(le(length))]
topic_id = _le(self.advertised_topics.get(topic_name)[0])
crclen = [checksum(_le(length))]
crcpack = [checksum(topic_id + packet)]

fpacket = header + le(length) + crclen + topic_id + packet + crcpack
fpacket = header + _le(length) + crclen + topic_id + packet + crcpack
self.uart.write(bytearray(fpacket))

def subscribe(self, topic_name, msgobj, cb, buffer_size=1024):
assert cb is not None, "Subscribe callback is not set"
def subscribe(self, topic_name, msg, _cb, buffer_size=1024):
"""subscribes to a topic receiving messages and processing them by a callback function

Args:
topic_name (string): name of destiny topic to send messages.
msg (ROS message): custom message object generated by ugenpy.
cb (function): callback function to process incoming messages.
buffer_size (int, optional): maximum size of buffer for message. Defaults to 1024.
"""
assert _cb is not None, "Subscribe callback is not set"

# subscribing topic attributes are added
self.subscribing_topics[self.id] = [msgobj, cb]
self.subscribing_topics[self._id] = [msg, _cb]

# advertised if not already subscribed
if topic_name not in self.advertised_topics:
msg = msgobj()
self._advertise_topic(topic_name, msg, 1, buffer_size)
self.advertised_topics[topic_name] = [self._id, msg, 1, buffer_size]
# id are summed by one
self._advertise_topic(topic_name, self._id, msg, 1, buffer_size)
self._id += 1

def _listen(self):
while True:
Expand Down Expand Up @@ -166,46 +178,47 @@ def _listen(self):
try:
# incoming object msg initialized
msgobj = self.subscribing_topics.get(inid)[0]
except Exception:
logging.info("TX request was made or got message from not available subscribed topic.")
except (OSError, TypeError, IndexError):
logging.info(
"TX request was made or got message from"
+ "not available subscribed topic."
)
# object sent to callback
callback = self.subscribing_topics.get(inid)[1]
fdata = msgobj()
fdata = fdata.deserialize(msgdata)
callback(fdata)
else:
raise ValueError("Message plus Topic ID Checksum is wrong!")
else:
self._advertise_all_topics()

except Exception as e:
except (OSError, TypeError, ValueError):
logging.info("No incoming data could be read for subscribes.")


# functions to be used in class
def word(l, h):
def word(_l, _h):
"""
Given a low and high bit, converts the number back into a word.
"""
Given a low and high bit, converts the number back into a word.
"""
return (h << 8) + l
return (_h << 8) + _l


# checksum method, receives array
def checksum(arr):
return 255 - ((sum(arr)) % 256)

"""Generates checksum value of message.

# little-endian method
def le(h):
h &= 0xFFFF
return [h & 0xFF, h >> 8]
Args:
arr (list): list of hexadecimal values.

Returns:
[int]: returns an value between 0 and 256
"""
return 255 - ((sum(arr)) % 256)

# example code
if __name__ == "__main__":
from std_msgs import String
from uros import NodeHandle

msg = String()
msg.data = "HiItsMeMario"
node = NodeHandle(2, 115200)
while True:
node.publish("greet", msg)
# little-endian method
def _le(_h):
_h &= 0xFFFF
return [_h & 0xFF, _h >> 8]