Skip to content

Commit

Permalink
Use oneapi::tbb::task_group instead of tbb::task if available
Browse files Browse the repository at this point in the history
As tbb::task has been removed in oneTBB 2021.01, we need to replace its use with oneapi::tbb::task_group. Define a wrapper so that tbb::task_group is used for newer versions of oneTBB.

Fixes gazebosim#2867.

Signed-off-by: Alex Dewar <alex.dewar@gmx.co.uk>
  • Loading branch information
alexdewar committed Dec 13, 2021
1 parent 215251b commit 440217d
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 36 deletions.
1 change: 1 addition & 0 deletions gazebo/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set (headers
SubscribeOptions.hh
Subscriber.hh
SubscriptionTransport.hh
TaskGroup.hh
TopicManager.hh
TransportIface.hh
TransportTypes.hh
Expand Down
25 changes: 10 additions & 15 deletions gazebo/transport/Connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#ifndef _CONNECTION_HH_
#define _CONNECTION_HH_

#include <tbb/task.h>
#include <google/protobuf/message.h>

#include <boost/asio.hpp>
Expand All @@ -37,6 +36,7 @@
#include "gazebo/common/Console.hh"
#include "gazebo/common/Exception.hh"
#include "gazebo/common/WeakBind.hh"
#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/util/system.hh"

#define HEADER_LENGTH 8
Expand All @@ -54,7 +54,7 @@ namespace gazebo
/// \cond
/// \brief A task instance that is created when data is read from
/// a socket and used by TBB
class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
class GZ_TRANSPORT_VISIBLE ConnectionReadTask
{
/// \brief Constructor
/// \param[_in] _func Boost function pointer, which is the function
Expand All @@ -68,13 +68,9 @@ namespace gazebo
{
}

/// \bried Overridden function from tbb::task that exectues the data
/// callback.
public: tbb::task *execute()
{
this->func(this->data);
return NULL;
}
/// \brief Execute the data callback
public: void operator()() const
{ this->func(this->data); }

/// \brief The boost function pointer
private: boost::function<void (const std::string &)> func;
Expand Down Expand Up @@ -310,12 +306,7 @@ namespace gazebo

if (!_e && !transport::is_stopped())
{
ConnectionReadTask *task = new(tbb::task::allocate_root())
ConnectionReadTask(boost::get<0>(_handler), data);
tbb::task::enqueue(*task);

// Non-tbb version:
// boost::get<0>(_handler)(data);
this->taskGroup.run<ConnectionReadTask>(boost::get<0>(_handler), data);
}
}

Expand Down Expand Up @@ -465,6 +456,10 @@ namespace gazebo

/// \brief True if the connection is open.
private: bool isOpen;


/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;
};
/// \}
}
Expand Down
13 changes: 4 additions & 9 deletions gazebo/transport/ConnectionManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,15 @@ using namespace gazebo;
using namespace transport;

/// TBB task to establish subscriber to publisher connection.
class TopicManagerConnectionTask : public tbb::task
class TopicManagerConnectionTask
{
/// \brief Constructor.
/// \param[in] _pub Publish message
public: explicit TopicManagerConnectionTask(msgs::Publish _pub) : pub(_pub) {}

/// Implements the necessary execute function
public: tbb::task *execute()
{
TopicManager::Instance()->ConnectSubToPub(pub);
return NULL;
}
public: void operator()() const
{ TopicManager::Instance()->ConnectSubToPub(pub); }

/// \brief Publish message
private: msgs::Publish pub;
Expand Down Expand Up @@ -385,9 +382,7 @@ void ConnectionManager::ProcessMessage(const std::string &_data)
if (pub.host() != this->serverConn->GetLocalAddress() ||
pub.port() != this->serverConn->GetLocalPort())
{
TopicManagerConnectionTask *task = new(tbb::task::allocate_root())
TopicManagerConnectionTask(pub);
tbb::task::enqueue(*task);
this->taskGroup.run<TopicManagerConnectionTask>(pub);
}
}
// publisher_subscribe. This occurs when we try to subscribe to a topic, and
Expand Down
6 changes: 5 additions & 1 deletion gazebo/transport/ConnectionManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
#include "gazebo/msgs/msgs.hh"
#include "gazebo/common/SingletonT.hh"

#include "gazebo/transport/Publisher.hh"
#include "gazebo/transport/Connection.hh"
#include "gazebo/transport/Publisher.hh"
#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/util/system.hh"

/// \brief Explicit instantiation for typed SingletonT.
Expand Down Expand Up @@ -193,6 +194,9 @@ namespace gazebo

/// \brief Condition used for synchronization
private: boost::condition_variable namespaceCondition;

/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;

// Singleton implementation
private: friend class SingletonT<ConnectionManager>;
Expand Down
19 changes: 8 additions & 11 deletions gazebo/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#ifndef GAZEBO_TRANSPORT_NODE_HH_
#define GAZEBO_TRANSPORT_NODE_HH_

#include <tbb/task.h>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <map>
#include <list>
#include <string>
#include <vector>

#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/transport/TransportTypes.hh"
#include "gazebo/transport/TopicManager.hh"
#include "gazebo/util/system.hh"
Expand All @@ -36,7 +36,7 @@ namespace gazebo
{
/// \cond
/// \brief Task used by Node::Publish to publish on a one-time publisher
class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
class GZ_TRANSPORT_VISIBLE PublishTask
{
/// \brief Constructor
/// \param[in] _pub Publisher to publish the message on.
Expand All @@ -49,16 +49,14 @@ namespace gazebo
this->msg->CopyFrom(_message);
}

/// \brief Overridden function from tbb::task that exectues the
/// publish task.
public: tbb::task *execute()
/// \brief Executes the publish task.
public: void operator()()
{
this->pub->WaitForConnection();
this->pub->Publish(*this->msg, true);
this->pub->SendMessage();
delete this->msg;
this->pub.reset();
return NULL;
}

/// \brief Pointer to the publisher.
Expand Down Expand Up @@ -159,11 +157,7 @@ namespace gazebo
const google::protobuf::Message &_message)
{
transport::PublisherPtr pub = this->Advertise<M>(_topic);
PublishTask *task = new(tbb::task::allocate_root())
PublishTask(pub, _message);

tbb::task::enqueue(*task);
return;
this->taskGroup.run<PublishTask>(pub, _message);
}

/// \brief Advertise a topic
Expand Down Expand Up @@ -418,6 +412,9 @@ namespace gazebo

/// \brief List of newly arrive messages
private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;

/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;

private: boost::mutex publisherMutex;
private: boost::mutex publisherDeleteMutex;
Expand Down
84 changes: 84 additions & 0 deletions gazebo/transport/TaskGroup.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (C) 2021 Alex Dewar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef _TASK_GROUP_HH_
#define _TASK_GROUP_HH_

#include <utility>

#if __has_include(<oneapi/tbb/task_group.h>)

// Emit is both a macro in Qt and a function defined by tbb
#undef emit
#include <oneapi/tbb/task_group.h>
#define emit

namespace gazebo {
namespace transport {
class TaskGroup
{
public: ~TaskGroup() noexcept
{
// Wait for running tasks to finish
this->taskGroup.wait();
}

public: template<class Functor, class... Args> void run(Args&&... args)
{
this->taskGroup.run(Functor(std::forward<Args>(args)...));
}

private: oneapi::tbb::task_group taskGroup;
};
}
}
#else
#include <tbb/task.h>

namespace gazebo {
namespace transport {
class TaskGroup
{
/// \brief A helper class which provides the requisite execute() method
/// required by tbb.
private: template<class T> class TaskWrapper : public tbb::task
{
public: template<class... Args> TaskWrapper<T>(Args&&... args)
: functor(std::forward<Args>(args)...)
{
}

public: tbb::task *execute()
{
this->functor();
return nullptr;
}

private: T functor;
};

public: template<class Functor, class... Args> void run(Args&&... args)
{
auto *task = new (tbb::task::allocate_root())
TaskWrapper<Functor>(std::forward<Args>(args)...);
tbb::task::enqueue(*task);
}
};
}
}

#endif
#endif
6 changes: 6 additions & 0 deletions gazebo/transport/transport_pch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
#include <string>
#include <tbb/blocked_range.h>
#include <tbb/parallel_for.h>
#if __has_include(<oneapi/tbb/task_group.h>)
#undef emit
#include <oneapi/tbb/task_group.h>
#define emit
#else
#include <tbb/task.h>
#endif
#include <utility>
#include <vector>

0 comments on commit 440217d

Please sign in to comment.