diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index 0c8295ca0..34f411bc0 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -1637,3 +1637,65 @@ def assert_liveliness(self) -> None: """ with self.handle as capsule: _rclpy.rclpy_assert_liveliness(capsule) + + def _get_info_by_topic( + self, + topic_name: str, + no_mangle: bool, + func: Callable[[object, str, bool], List[Dict]]) -> List[Dict]: + fq_topic_name = expand_topic_name(topic_name, self.get_name(), self.get_namespace()) + validate_topic_name(fq_topic_name) + with self.handle as node_capsule: + return func(node_capsule, fq_topic_name, no_mangle) + + def get_publishers_info_by_topic( + self, topic_name: str, no_mangle: bool = False) -> List[Dict]: + """ + Return a list of publishers publishing to a given topic. + + The returned parameter is a list of dictionaries, where each dictionary will + contain the node name, node namespace, topic type, participant's GID and its QoS profile. + + 'topic_name' may be a relative, private or fully qualified topic name. + A relative or private topic will be expanded using this node's namespace and name, if + the 'no_mangle' param is set to false. + + The queried topic name is not remapped. + + 'no_mangle' defaults to false. + + :param topic_name: the topic_name on which to find the publishers. + :param no_mangle: if false, the given topic name will be expanded + to its fully qualified name. + :return: a list of dictionaries representing all the publishers on this topic. + """ + return self._get_info_by_topic( + topic_name, + no_mangle, + _rclpy.rclpy_get_publishers_info_by_topic) + + def get_subscriptions_info_by_topic( + self, topic_name: str, no_mangle: bool = False) -> List[Dict]: + """ + Return a list of subscriptions to a given topic. + + The returned parameter is a list of dictionaries, where each dictionary will contain + the node name, node namespace, topic type, participant's GID and its QoS profile. + + 'topic_name' may be a relative, private or fully qualified topic name. + A relative or private topic will be expanded using this node's namespace and name, if + the 'no_mangle' param is set to false. + + The queried topic name is not remapped. + + 'no_mangle' defaults to false. + + :param topic_name: the topic_name on which to find the subscriptions. + :param no_mangle: if false, the given topic name will be expanded + to its fully qualified name. + :return: a list of dictionaries representing all the subscriptions on this topic. + """ + return self._get_info_by_topic( + topic_name, + no_mangle, + _rclpy.rclpy_get_subscriptions_info_by_topic) diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index 4d24d510b..ab32c5329 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -851,6 +852,91 @@ rclpy_count_subscribers(PyObject * Py_UNUSED(self), PyObject * args) return _count_subscribers_publishers(args, "subscribers", rcl_count_subscribers); } +typedef rcl_ret_t (* rcl_get_info_by_topic_func)( + const rcl_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_info_array_t * info_array); + +static PyObject * +_get_info_by_topic( + PyObject * args, const char * type, + rcl_get_info_by_topic_func rcl_get_info_by_topic) +{ + PyObject * pynode; + const char * topic_name; + PyObject * pyno_mangle; + + if (!PyArg_ParseTuple(args, "OsO", &pynode, &topic_name, &pyno_mangle)) { + return NULL; + } + + rcl_node_t * node = (rcl_node_t *)PyCapsule_GetPointer(pynode, "rcl_node_t"); + if (!node) { + return NULL; + } + bool no_mangle = PyObject_IsTrue(pyno_mangle); + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + rmw_topic_info_array_t info_array = rmw_get_zero_initialized_topic_info_array(); + rcl_ret_t ret = rcl_get_info_by_topic(node, &allocator, topic_name, no_mangle, &info_array); + rmw_ret_t fini_ret; + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, "Failed to get information by topic for %s: %s", + type, rcl_get_error_string().str); + rcl_reset_error(); + fini_ret = rmw_topic_info_array_fini(&allocator, &info_array); + if (fini_ret != RMW_RET_OK) { + PyErr_Format(PyExc_RuntimeError, "rmw_topic_info_array_fini failed."); + rmw_reset_error(); + } + return NULL; + } + PyObject * py_info_array = rclpy_convert_to_py_topic_info_list(&info_array); + fini_ret = rmw_topic_info_array_fini(&allocator, &info_array); + if (fini_ret != RMW_RET_OK) { + PyErr_Format(PyExc_RuntimeError, "rmw_topic_info_array_fini failed."); + rmw_reset_error(); + return NULL; + } + return py_info_array; +} + +/// Returns a list of publishers, publishing to a topic +/// The returned publisher information includes node name, node namespace, +/// topic type, gid and qos profile +/** + * + * \param[in] pynode Capsule pointing to the node to get the namespace from. + * \param[in] topic_name the topic name to get the publishers for. + * \param[in] no_mangle if true the given topic name will be + * expanded to its fully qualified name. + * \return list of publishers + */ +static PyObject * +rclpy_get_publishers_info_by_topic(PyObject * Py_UNUSED(self), PyObject * args) +{ + return _get_info_by_topic(args, "publishers", rcl_get_publishers_info_by_topic); +} + +/// Returns a list of subscriptions to a topic +/// The returned subscription information includes node name, node namespace, +/// topic type, gid and qos profile +/** + * + * \param[in] pynode Capsule pointing to the node to get the namespace from. + * \param[in] topic_name the topic name to get the subscriptions for. + * \param[in] no_mangle if true the given topic name will be + * expanded to its fully qualified name. + * \return list of subscriptions. + */ +static PyObject * +rclpy_get_subscriptions_info_by_topic(PyObject * Py_UNUSED(self), PyObject * args) +{ + return _get_info_by_topic(args, "subscriptions", rcl_get_subscriptions_info_by_topic); +} + + /// Validate a topic name and return error message and index of invalidation. /** * Does not have to be a fully qualified topic name. @@ -4750,6 +4836,14 @@ static PyMethodDef rclpy_methods[] = { "rclpy_count_subscribers", rclpy_count_subscribers, METH_VARARGS, "Count subscribers for a topic." }, + { + "rclpy_get_publishers_info_by_topic", rclpy_get_publishers_info_by_topic, METH_VARARGS, + "Get a list of publishers for a topic." + }, + { + "rclpy_get_subscriptions_info_by_topic", rclpy_get_subscriptions_info_by_topic, METH_VARARGS, + "Get a list of subscriptions for a topic." + }, { "rclpy_expand_topic_name", rclpy_expand_topic_name, METH_VARARGS, "Expand a topic name." diff --git a/rclpy/src/rclpy_common/include/rclpy_common/common.h b/rclpy/src/rclpy_common/include/rclpy_common/common.h index 7211db2a0..b5023f314 100644 --- a/rclpy/src/rclpy_common/include/rclpy_common/common.h +++ b/rclpy/src/rclpy_common/include/rclpy_common/common.h @@ -123,4 +123,13 @@ RCLPY_COMMON_PUBLIC PyObject * rclpy_convert_to_py(void * message, PyObject * pyclass); +/// Convert a C rmw_topic_info_array_t into a Python list. +/** + * \param[in] info_array a pointer to a rmw_topic_info_array_t + * \return Python list + */ +RCLPY_COMMON_PUBLIC +PyObject * +rclpy_convert_to_py_topic_info_list(const rmw_topic_info_array_t * info_array); + #endif // RCLPY_COMMON__COMMON_H_ diff --git a/rclpy/src/rclpy_common/src/common.c b/rclpy/src/rclpy_common/src/common.c index b8f11c92f..a6da78114 100644 --- a/rclpy/src/rclpy_common/src/common.c +++ b/rclpy/src/rclpy_common/src/common.c @@ -338,3 +338,96 @@ rclpy_convert_to_py(void * message, PyObject * pyclass) } return convert(message); } + +PyObject * +rclpy_convert_to_py_topic_info_list(const rmw_topic_info_array_t * info_array) +{ + if (!info_array) { + return NULL; + } + + PyObject * py_info_array = PyList_New(info_array->count); + if (!py_info_array) { + return NULL; + } + + for (size_t i = 0; i < info_array->count; ++i) { + rmw_topic_info_t topic_info = info_array->info_array[i]; + const rmw_qos_profile_t * qos_profile = topic_info.qos_profile; + PyObject * py_qos_profile = rclpy_common_convert_to_qos_dict(qos_profile); + if (!py_qos_profile) { + Py_DECREF(py_info_array); + return NULL; + } + const char * node_name = topic_info.node_name; + PyObject * py_node_name = PyUnicode_FromString(node_name); + if (!py_node_name) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + return NULL; + } + + const char * node_namespace = topic_info.node_namespace; + PyObject * py_node_namespace = PyUnicode_FromString(node_namespace); + if (!py_node_namespace) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + Py_DECREF(py_node_name); + return NULL; + } + + const char * topic_type = topic_info.topic_type; + PyObject * py_topic_type = PyUnicode_FromString(topic_type); + if (!py_topic_type) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + Py_DECREF(py_node_name); + Py_DECREF(py_node_namespace); + return NULL; + } + + const char * gid = topic_info.gid; + PyObject * py_gid = PyUnicode_FromString(gid); + if (!py_gid) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + Py_DECREF(py_node_name); + Py_DECREF(py_node_namespace); + Py_DECREF(py_topic_type); + return NULL; + } + + // Create dictionary that represents rmw_topic_info_t + PyObject * py_topic_info_dict = PyDict_New(); + if (!py_topic_info_dict) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + Py_DECREF(py_node_name); + Py_DECREF(py_node_namespace); + Py_DECREF(py_topic_type); + Py_DECREF(py_gid); + return NULL; + } + // Populate keyword arguments + // A success returns 0, and a failure returns -1 + int set_result = 0; + set_result += PyDict_SetItemString(py_topic_info_dict, "qos_profile", py_qos_profile); + set_result += PyDict_SetItemString(py_topic_info_dict, "node_name", py_node_name); + set_result += PyDict_SetItemString(py_topic_info_dict, "node_namespace", py_node_namespace); + set_result += PyDict_SetItemString(py_topic_info_dict, "topic_type", py_topic_type); + set_result += PyDict_SetItemString(py_topic_info_dict, "gid", py_gid); + if (set_result != 0) { + Py_DECREF(py_info_array); + Py_DECREF(py_qos_profile); + Py_DECREF(py_node_name); + Py_DECREF(py_node_namespace); + Py_DECREF(py_topic_type); + Py_DECREF(py_gid); + Py_DECREF(py_topic_info_dict); + return NULL; + } + // add this dict to the list + PyList_SET_ITEM(py_info_array, i, py_topic_info_dict); + } + return py_info_array; +} diff --git a/rclpy/test/test_node.py b/rclpy/test/test_node.py index 6fd9c4e52..aba17a54e 100644 --- a/rclpy/test/test_node.py +++ b/rclpy/test/test_node.py @@ -25,6 +25,7 @@ from rcl_interfaces.srv import GetParameters import rclpy from rclpy.clock import ClockType +from rclpy.duration import Duration from rclpy.exceptions import InvalidParameterException from rclpy.exceptions import InvalidParameterValueException from rclpy.exceptions import InvalidServiceNameException @@ -35,6 +36,11 @@ from rclpy.executors import SingleThreadedExecutor from rclpy.parameter import Parameter from rclpy.qos import qos_profile_sensor_data +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSHistoryPolicy +from rclpy.qos import QoSLivelinessPolicy +from rclpy.qos import QoSProfile +from rclpy.qos import QoSReliabilityPolicy from rclpy.time_source import USE_SIM_TIME_NAME from test_msgs.msg import BasicTypes @@ -44,6 +50,17 @@ TEST_RESOURCES_DIR = pathlib.Path(__file__).resolve().parent / 'resources' / 'test_node' +def are_qos_equal(qos_profile, qos_dict): + # Depth and history are skipped because they are not retrieved. + return qos_profile.durability == qos_dict['durability'] and \ + qos_profile.reliability == qos_dict['reliability'] and \ + qos_profile.deadline == qos_dict['deadline'] and \ + qos_profile.lifespan == qos_dict['lifespan'] and \ + qos_profile.liveliness == qos_dict['liveliness'] and \ + qos_profile.liveliness_lease_duration == \ + qos_dict['liveliness_lease_duration'] + + class TestNodeAllowUndeclaredParameters(unittest.TestCase): @classmethod @@ -171,6 +188,72 @@ def test_node_names_and_namespaces(self): # test that it doesn't raise self.node.get_node_names_and_namespaces() + def test_get_publishers_subscriptions_info_by_topic(self): + topic_name = 'test_topic_info' + fq_topic_name = '{namespace}/{name}'.format(namespace=TEST_NAMESPACE, name=topic_name) + # Lists should be empty + self.assertFalse(self.node.get_publishers_info_by_topic(fq_topic_name)) + self.assertFalse(self.node.get_subscriptions_info_by_topic(fq_topic_name)) + + # Add a publisher + qos_profile = QoSProfile( + depth=10, + history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_ALL, + deadline=Duration(seconds=1, nanoseconds=12345), + lifespan=Duration(seconds=20, nanoseconds=9887665), + reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, + durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL, + liveliness_lease_duration=Duration(seconds=5, nanoseconds=23456), + liveliness=QoSLivelinessPolicy.MANUAL_BY_TOPIC) + self.node.create_publisher(BasicTypes, topic_name, qos_profile) + # List should have one item + publisher_list = self.node.get_publishers_info_by_topic(fq_topic_name) + self.assertEqual(1, len(publisher_list)) + # Subscription list should be empty + self.assertFalse(self.node.get_subscriptions_info_by_topic(fq_topic_name)) + # Verify publisher list has the right data + # self.assertEqual(self.node.get_name(), publisher_list[0].get('node_name')) + # self.assertEqual(self.node.get_namespace(), publisher_list[0].get('node_namespace')) + self.assertEqual('test_msgs::msg::dds_::BasicTypes_', publisher_list[0].get('topic_type')) + actual_qos_profile = publisher_list[0].get('qos_profile') + self.assertTrue(are_qos_equal(qos_profile, actual_qos_profile)) + + # Add a subscription + qos_profile2 = QoSProfile( + depth=0, + history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST, + deadline=Duration(seconds=15, nanoseconds=1678), + lifespan=Duration(seconds=29, nanoseconds=2345), + reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE, + durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE, + liveliness_lease_duration=Duration(seconds=5, nanoseconds=23456), + liveliness=QoSLivelinessPolicy.MANUAL_BY_NODE) + self.node.create_subscription(BasicTypes, topic_name, lambda msg: print(msg), qos_profile2) + # Both lists should have one item + publisher_list = self.node.get_publishers_info_by_topic(fq_topic_name) + subscription_list = self.node.get_subscriptions_info_by_topic(fq_topic_name) + self.assertEqual(1, len(publisher_list)) + self.assertEqual(1, len(subscription_list)) + # Verify subscription list has the right data + # self.assertEqual(self.node.get_name(), publisher_list[0].get('node_name')) + # self.assertEqual(self.node.get_namespace(), publisher_list[0].get('node_namespace')) + self.assertEqual('test_msgs::msg::dds_::BasicTypes_', + publisher_list[0].get('topic_type')) + self.assertEqual('test_msgs::msg::dds_::BasicTypes_', + subscription_list[0].get('topic_type')) + publisher_qos_profile = publisher_list[0].get('qos_profile') + subscription_qos_profile = subscription_list[0].get('qos_profile') + self.assertTrue(are_qos_equal(qos_profile, publisher_qos_profile)) + self.assertTrue(are_qos_equal(qos_profile2, subscription_qos_profile)) + + # Error cases + with self.assertRaisesRegex(TypeError, 'bad argument type for built-in operation'): + self.node.get_subscriptions_info_by_topic(1) + self.node.get_publishers_info_by_topic(1) + with self.assertRaisesRegex(ValueError, 'is invalid'): + self.node.get_subscriptions_info_by_topic('13') + self.node.get_publishers_info_by_topic('13') + def test_count_publishers_subscribers(self): short_topic_name = 'chatter' fq_topic_name = '%s/%s' % (TEST_NAMESPACE, short_topic_name)