Skip to content

Commit 32d4aa5

Browse files
committed
Fix async
Signed-off-by: methylDragon <methylDragon@gmail.com>
1 parent bcd20f7 commit 32d4aa5

11 files changed

+271
-112
lines changed

fuse_core/include/fuse_core/async_motion_model.hpp

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ namespace fuse_core
4747
{
4848

4949
/**
50-
* @brief A motion model base class that provides node handles and a private callback queue.
50+
* @brief A motion model base class that provides an internal node and an internal callback queue.
5151
*
5252
* A model model plugin is responsible for generating constraints that link together timestamps
5353
* introduced by other sensors in the system. The AsyncMotionModel class is designed similar to a
@@ -56,11 +56,11 @@ namespace fuse_core
5656
* There are a few notable differences between the AsyncMotionModel class and a standard ROS
5757
* node. First and most obvious, the AsyncMotionModel class is designed as a plugin, with all of
5858
* the stipulations and requirements that come with all ROS plugins (must be derived from a known
59-
* base class, will be default constructed). Second, the AsyncMotionModel class provides a global
60-
* and private node handle, both hooked to a local callback queue and local spinner. This makes
59+
* base class, will be default constructed). Second, the AsyncMotionModel class provides an internal
60+
* node that is hooked to a local callback queue and local executor on init. This makes
6161
* it act like a full ROS node -- subscriptions trigger message callbacks, callbacks will fire
6262
* sequentially, etc. However, authors of derived classes should be aware of this fact and avoid
63-
* creating additional node handles, or at least take care when creating new node handles and
63+
* creating additional sub-nodes, or at least take care when creating new sub-nodes and
6464
* additional callback queues. Finally, the interaction of motion model nodes is best compared to
6565
* a service call -- an external actor will provide a set of timestamps and wait for the motion
6666
* model to respond with the required set of constraints to link the timestamps together (along
@@ -120,9 +120,9 @@ class AsyncMotionModel : public MotionModel
120120
* This method will be called by the optimizer, in the optimizer's thread, after each Graph
121121
* update is complete. This implementation repackages the provided \p graph, and inserts a
122122
* call to onGraphUpdate() into this motion model's callback queue. This is meant to simplify
123-
* thread synchronization. If this motion model uses a single-threaded spinner, then all
123+
* thread synchronization. If this motion model uses a single-threaded executor, then all
124124
* callbacks will fire sequentially and no semaphores are needed. If this motion model uses a
125-
* multi-threaded spinner, then normal multithreading rules apply and data accessed in more
125+
* multi-threaded executor, then normal multithreading rules apply and data accessed in more
126126
* than one place should be guarded.
127127
*
128128
* @param[in] graph A read-only pointer to the graph object, allowing queries to be performed
@@ -135,9 +135,9 @@ class AsyncMotionModel : public MotionModel
135135
* reading from the parameter server.
136136
*
137137
* This will be called for each plugin after construction and after the ros node has been
138-
* initialized. The provided private node handle will be in a namespace based on the plugin's
139-
* name. This should prevent conflicts and allow the same plugin to be used multiple times
140-
* with different settings and topics.
138+
* initialized. The provided node will be in a namespace based on the plugin's name. This should
139+
* prevent conflicts and allow the same plugin to be used multiple times with different settings
140+
* and topics.
141141
*
142142
* @param[in] name A unique name to give this plugin instance
143143
*/
@@ -159,9 +159,9 @@ class AsyncMotionModel : public MotionModel
159159
*
160160
* This implementation inserts a call to onStart() into this motion model's callback queue.
161161
* This method then blocks until the call to onStart() has completed. This is meant to
162-
* simplify thread synchronization. If this motion model uses a single-threaded spinner, then
162+
* simplify thread synchronization. If this motion model uses a single-threaded executor, then
163163
* all callbacks will fire sequentially and no semaphores are needed. If this motion model
164-
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
164+
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
165165
* more than one place should be guarded.
166166
*/
167167
void start() override;
@@ -178,9 +178,9 @@ class AsyncMotionModel : public MotionModel
178178
*
179179
* This implementation inserts a call to onStop() into this motion model's callback queue.
180180
* This method then blocks until the call to onStop() has completed. This is meant to
181-
* simplify thread synchronization. If this motion model uses a single-threaded spinner, then
181+
* simplify thread synchronization. If this motion model uses a single-threaded executor, then
182182
* all callbacks will fire sequentially and no semaphores are needed. If this motion model
183-
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
183+
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
184184
* more than one place should be guarded.
185185
*/
186186
void stop() override;
@@ -192,16 +192,17 @@ class AsyncMotionModel : public MotionModel
192192
std::string name_; //!< The unique name for this motion model instance
193193
rclcpp::Node::SharedPtr node_; //!< The node for this motion model
194194

195-
//! A single/multi-threaded spinner assigned to the local callback queue
195+
//! A single/multi-threaded executor assigned to the local callback queue
196196
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
197197

198198
size_t executor_thread_count_;
199199
std::thread spinner_; //!< Internal thread for spinning the executor
200+
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized
200201

201202
/**
202203
* @brief Constructor
203204
*
204-
* Construct a new motion model and create a local callback queue and thread spinner.
205+
* Construct a new motion model and create a local callback queue and internal executor.
205206
*
206207
* @param[in] thread_count The number of threads used to service the local callback queue
207208
*/
@@ -250,7 +251,7 @@ class AsyncMotionModel : public MotionModel
250251
* @brief Perform any required initialization for the motion model
251252
*
252253
* This could include things like reading from the parameter server or subscribing to topics.
253-
* The class's node handles will be properly initialized before onInit() is called. Spinning
254+
* The class's node will be properly initialized before onInit() is called. Spinning
254255
* of the callback queue will not begin until after the call to onInit() completes.
255256
*/
256257
virtual void onInit() {}

fuse_core/include/fuse_core/async_publisher.hpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,16 @@ namespace fuse_core
5050
{
5151

5252
/**
53-
* @brief A publisher base class that provides node handles attached to an internal callback queue
54-
* serviced by a local thread (or threads) using a spinner.
53+
* @brief A publisher base class that provides an internal callback queue attached to a ROS 2 Node
54+
* serviced by a local executor.
5555
*
5656
* This allows publishers derived from this base class to operate similarly to a stand alone node
5757
* -- any subscription or service callbacks will be executed within an independent thread. The
5858
* notable differences include:
5959
* - a default constructor is required (because this is a plugin)
6060
* - subscriptions, services, parameter server interactions, etc. should be placed in the onInit()
6161
* call instead of in the constructor
62-
* - a global node handle and private node handle have already been created and attached to a local
63-
* callback queue
62+
* - a node has already been created and attached to a local callback queue
6463
* - a special callback, notifyCallback(), will be fired every time the optimizer completes an
6564
* optimization cycle
6665
*/
@@ -77,10 +76,10 @@ class AsyncPublisher : public Publisher
7776
/**
7877
* @brief Initialize the AsyncPublisher object
7978
*
80-
* This will store the provided name and graph object, and create a global and private node
81-
* handle that both use an internal callback queue serviced by a local thread. The
82-
* AsyncPublisher::onInit() method will be called from here, once the node handles are
83-
* properly configured.
79+
* This will store the provided name and graph object, and create an internal node for this
80+
* instance that will use an internal callback queue serviced by a local thread. The
81+
* AsyncPublisher::onInit() method will be called from here, once the internal node is properly
82+
* configured.
8483
*
8584
* @param[in] name A unique name to give this plugin instance
8685
*/
@@ -120,9 +119,9 @@ class AsyncPublisher : public Publisher
120119
*
121120
* This implementation inserts a call to onStart() into this publisher's callback queue. This
122121
* method then blocks until the call to onStart() has completed. This is meant to simplify
123-
* thread synchronization. If this publisher uses a single-threaded spinner, then all
122+
* thread synchronization. If this publisher uses a single-threaded executor, then all
124123
* callbacks will fire sequentially and no semaphores are needed. If this publisher uses a
125-
* multithreaded spinner, then normal multithreading rules apply and data accessed in more
124+
* multithreaded executor, then normal multithreading rules apply and data accessed in more
126125
* than one place should be guarded.
127126
*/
128127
void start() override;
@@ -139,7 +138,7 @@ class AsyncPublisher : public Publisher
139138
*
140139
* This implementation inserts a call to onStop() into this publisher's callback queue. This
141140
* method then blocks until the call to onStop() has completed. This is meant to simplify
142-
* thread synchronization. If this publisher uses a single-threaded spinner, then all
141+
* thread synchronization. If this publisher uses a single-threaded executor, then all
143142
* callbacks will fire sequentially and no semaphores are needed. If this publisher uses a
144143
* multithreaded spinner, then normal multithreading rules apply and data accessed in more
145144
* than one place should be guarded.
@@ -153,15 +152,16 @@ class AsyncPublisher : public Publisher
153152
std::string name_; //!< The unique name for this publisher instance
154153
rclcpp::Node::SharedPtr node_; //!< The node for this publisher
155154

156-
//! A single/multi-threaded spinner assigned to the local callback queue
155+
//! A single/multi-threaded executor assigned to the local callback queue
157156
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
158157
size_t executor_thread_count_;
159158
std::thread spinner_; //!< Internal thread for spinning the executor
159+
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized
160160

161161
/**
162162
* @brief Constructor
163163
*
164-
* Constructs a new publisher with node handles, a local callback queue, and thread spinner.
164+
* Constructs a new publisher with a local node, a local callback queue, and internal executor.
165165
*
166166
* @param[in] thread_count The number of threads used to service the local callback queue
167167
*/

fuse_core/include/fuse_core/async_sensor_model.hpp

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,22 @@ namespace fuse_core
4747
{
4848

4949
/**
50-
* @brief A sensor model base class that provides node handles and a private callback queue.
50+
* @brief A sensor model base class that provides an internal node and an internal callback queue.
5151
*
5252
* A sensor model plugin is responsible for generating new constraints, packaging them in a
5353
* fuse_core::Transaction object, and passing them along to the optimizer. The asynchronous
54-
* sensor model plugin is designed similar to the nodelet plugin, attempting to be as generic and
54+
* sensor model plugin is designed similar to a nodelet plugin, attempting to be as generic and
5555
* easy to use as a standard ROS node.
5656
*
5757
* There are a few notable differences between the asynchronous sensor model plugin and a
5858
* standard ROS node. First and most obvious, the sensor model is designed as a plugin, with all
5959
* of the stipulations and requirements that come with all ROS plugins (must be derived from a
6060
* known base class, will be default constructed). Second, the base AsyncSensorModel class
61-
* provides a global and private node handle, both hooked to a local callback queue and local
62-
* spinner. This makes it act like a full ROS node -- subscriptions trigger message callbacks,
61+
* provides an internal node that is hooked to a local callback queue and local executor on
62+
* init. This makes it act like a full ROS node -- subscriptions trigger message callbacks,
6363
* callbacks will fire sequentially, etc. However, authors of derived sensor models should be
64-
* aware of this fact and avoid creating additional node handles, or at least take care when
65-
* creating new node handles and additional callback queues. Finally, instead of publishing the
64+
* aware of this fact and avoid creating additional sub-nodes, or at least take care when
65+
* creating new sub-nodes and additional callback queues. Finally, instead of publishing the
6666
* generated constraints using a ROS publisher, the asynchronous sensor model provides a
6767
* "sendTransaction()" method that should be called whenever the sensor is ready to send a
6868
* fuse_core::Transaction object to the Optimizer. Under the hood, this executes the
@@ -103,9 +103,9 @@ class AsyncSensorModel : public SensorModel
103103
* This method will be called by the optimizer, in the optimizer's thread, after each Graph
104104
* update is complete. This implementation repackages the provided \p graph, and inserts a
105105
* call to onGraphUpdate() into this sensor's callback queue. This is meant to simplify
106-
* thread synchronization. If this sensor uses a single-threaded spinner, then all callbacks
106+
* thread synchronization. If this sensor uses a single-threaded executor, then all callbacks
107107
* will fire sequentially and no semaphores are needed. If this sensor uses a multi-threaded
108-
* spinner, then normal multithreading rules apply and data accessed in more than one place
108+
* executor, then normal multithreading rules apply and data accessed in more than one place
109109
* should be guarded.
110110
*
111111
* @param[in] graph A read-only pointer to the graph object, allowing queries to be performed
@@ -118,9 +118,9 @@ class AsyncSensorModel : public SensorModel
118118
* reading from the parameter server.
119119
*
120120
* This will be called for each plugin after construction and after the ROS node has been
121-
* initialized. The provided private node handle will be in a namespace based on the plugin's
122-
* name. This should prevent conflicts and allow the same plugin to be used multiple times
123-
* with different settings and topics.
121+
* initialized. The provided node will be in a namespace based on the plugin's name. This should
122+
* prevent conflicts and allow the same plugin to be used multiple times with different settings
123+
* and topics.
124124
*
125125
* @param[in] name A unique name to give this plugin instance
126126
* @param[in] transaction_callback The function to call every time a transaction is published
@@ -162,9 +162,9 @@ class AsyncSensorModel : public SensorModel
162162
*
163163
* This implementation inserts a call to onStart() into this sensor model's callback queue.
164164
* This method then blocks until the call to onStart() has completed. This is meant to
165-
* simplify thread synchronization. If this sensor model uses a single-threaded spinner, then
165+
* simplify thread synchronization. If this sensor model uses a single-threaded executor, then
166166
* all callbacks will fire sequentially and no semaphores are needed. If this sensor model
167-
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
167+
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
168168
* more than one place should be guarded.
169169
*/
170170
void start() override;
@@ -180,9 +180,9 @@ class AsyncSensorModel : public SensorModel
180180
*
181181
* This implementation inserts a call to onStop() into this sensor model's callback queue.
182182
* This method then blocks until the call to onStop() has completed. This is meant to
183-
* simplify thread synchronization. If this sensor model uses a single-threaded spinner, then
183+
* simplify thread synchronization. If this sensor model uses a single-threaded executor, then
184184
* all callbacks will fire sequentially and no semaphores are needed. If this sensor model
185-
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
185+
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
186186
* more than one place should be guarded.
187187
*/
188188
void stop() override;
@@ -194,18 +194,19 @@ class AsyncSensorModel : public SensorModel
194194
std::string name_; //!< The unique name for this sensor model instance
195195
rclcpp::Node::SharedPtr node_; //!< The node for this sensor model
196196

197-
//! A single/multi-threaded spinner assigned to the local callback queue
197+
//! A single/multi-threaded executor assigned to the local callback queue
198198
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
199199

200200
//! The function to be executed every time a Transaction is "published"
201201
TransactionCallback transaction_callback_;
202202
size_t executor_thread_count_;
203203
std::thread spinner_; //!< Internal thread for spinning the executor
204+
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized
204205

205206
/**
206207
* @brief Constructor
207208
*
208-
* Construct a new sensor model and create a local callback queue and thread spinner.
209+
* Construct a new sensor model and create a local callback queue and internal executor.
209210
*
210211
* @param[in] thread_count The number of threads used to service the local callback queue
211212
*/
@@ -234,7 +235,7 @@ class AsyncSensorModel : public SensorModel
234235
* @brief Perform any required initialization for the sensor model
235236
*
236237
* This could include things like reading from the parameter server or subscribing to topics.
237-
* The class's node handles will be properly initialized before onInit() is called. Spinning
238+
* The class's node will be properly initialized before onInit() is called. Spinning
238239
* of the callback queue will not begin until after the call to onInit() completes.
239240
*
240241
* Derived sensor models classes must implement this function, because otherwise I'm not sure

fuse_core/include/fuse_core/callback_wrapper.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ class CallbackAdapter : public rclcpp::Waitable
188188

189189
void removeAllCallbacks();
190190

191+
void triggerGuardCondition();
192+
191193
private:
192194
rcl_guard_condition_t gc_; //!< guard condition to drive the waitable
193195

0 commit comments

Comments
 (0)