Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add record, stop, start_discovery, stop_discovery and is_discovery_stopped services for rosbag2_transport::Recorder #1840

Open
wants to merge 9 commits into
base: rolling
Choose a base branch
from
4 changes: 4 additions & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/WriteSplitEvent.msg"
"srv/Burst.srv"
"srv/GetRate.srv"
"srv/IsDiscoveryStopped.srv"
"srv/IsPaused.srv"
"srv/Pause.srv"
"srv/Play.srv"
"srv/PlayNext.srv"
"srv/Record.srv"
"srv/Resume.srv"
"srv/Seek.srv"
"srv/SetRate.srv"
"srv/Snapshot.srv"
"srv/SplitBagfile.srv"
"srv/StartDiscovery.srv"
"srv/Stop.srv"
"srv/StopDiscovery.srv"
"srv/TogglePaused.srv"
DEPENDENCIES builtin_interfaces
ADD_LINTER_TESTS
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_interfaces/srv/IsDiscoveryStopped.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
bool stopped
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/Record.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/StartDiscovery.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/StopDiscovery.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
9 changes: 9 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@

#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_interfaces/srv/is_discovery_stopped.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/record.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"
#include "rosbag2_interfaces/srv/start_discovery.hpp"
#include "rosbag2_interfaces/srv/stop.hpp"
#include "rosbag2_interfaces/srv/stop_discovery.hpp"

#include "rosbag2_interfaces/msg/write_split_event.hpp"

Expand Down Expand Up @@ -154,6 +159,10 @@ class Recorder : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
bool is_paused();

/// Return the current discovery state
ROSBAG2_TRANSPORT_PUBLIC
bool is_discovery_stopped();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to the is_discovery_running()


inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE;

protected:
Expand Down
77 changes: 75 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class RecorderImpl
/// Stop discovery
void stop_discovery();

/// Return the current discovery state.
bool is_discovery_stopped();

std::unordered_map<std::string, std::string> get_requested_or_available_topics();

/// Public members for access by wrapper
Expand Down Expand Up @@ -135,15 +138,20 @@ class RecorderImpl
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::IsDiscoveryStopped>::SharedPtr srv_is_discovery_stopped_;
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
rclcpp::Service<rosbag2_interfaces::srv::Record>::SharedPtr srv_record_;
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;
rclcpp::Service<rosbag2_interfaces::srv::StartDiscovery>::SharedPtr srv_start_discovery_;
rclcpp::Service<rosbag2_interfaces::srv::Stop>::SharedPtr srv_stop_;
rclcpp::Service<rosbag2_interfaces::srv::StopDiscovery>::SharedPtr srv_stop_discovery_;

std::mutex start_stop_transition_mutex_;
std::mutex discovery_mutex_;
std::atomic<bool> stop_discovery_ = false;
std::atomic<bool> stop_discovery_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop_discovery_ shall be false by default. Otherwise, we will immediately exit from RecorderImpl::topics_discovery() when running it the first time.

Suggested change
std::atomic<bool> stop_discovery_ = true;
std::atomic<bool> stop_discovery_ = false;

std::atomic_uchar paused_ = 0;
std::atomic<bool> in_recording_ = false;
std::shared_ptr<KeyboardHandler> keyboard_handler_;
Expand Down Expand Up @@ -294,6 +302,56 @@ void RecorderImpl::record()
writer_->split_bagfile();
});

srv_start_discovery_ = node->create_service<rosbag2_interfaces::srv::StartDiscovery>(
"~/start_discovery",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::StartDiscovery::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::StartDiscovery::Response>/* response */)
{
start_discovery();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check if (in_recording_) before calling start discovery. Otherwise, it will be an undefined behavior since the writer may not be opened yet, but discovery can try to create a subscription with a callback calling writer->write(message).

});

srv_stop_discovery_ = node->create_service<rosbag2_interfaces::srv::StopDiscovery>(
"~/stop_discovery",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::StopDiscovery::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::StopDiscovery::Response>/* response */)
{
stop_discovery();
});

srv_is_discovery_stopped_ = node->create_service<rosbag2_interfaces::srv::IsDiscoveryStopped>(
"~/is_discovery_stopped",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::IsDiscoveryStopped::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::IsDiscoveryStopped::Response> response)
{
response->stopped = is_discovery_stopped();
});

srv_record_ = node->create_service<rosbag2_interfaces::srv::Record>(
"~/record",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Record::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Record::Response>/* response */)
{
record();
});

srv_stop_ = node->create_service<rosbag2_interfaces::srv::Stop>(
"~/stop",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Stop::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Stop::Response>/* response */)
{
stop();
});

srv_pause_ = node->create_service<rosbag2_interfaces::srv::Pause>(
"~/pause",
[this](
Expand Down Expand Up @@ -328,6 +386,10 @@ void RecorderImpl::record()
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);

// Start the thread that will publish events
{
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_);
event_publisher_thread_should_exit_ = false;
}
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this);

rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
Expand Down Expand Up @@ -436,7 +498,7 @@ bool RecorderImpl::is_paused()
void RecorderImpl::start_discovery()
{
std::lock_guard<std::mutex> state_lock(discovery_mutex_);
if (stop_discovery_.exchange(false)) {
if (!stop_discovery_.exchange(false)) {
RCLCPP_DEBUG(node->get_logger(), "Recorder topic discovery is already running.");
} else {
discovery_future_ =
Expand Down Expand Up @@ -464,6 +526,11 @@ void RecorderImpl::stop_discovery()
}
}

bool RecorderImpl::is_discovery_stopped()
{
return stop_discovery_.load();
}

void RecorderImpl::topics_discovery()
{
// If using sim time - wait until /clock topic received before even creating subscriptions
Expand Down Expand Up @@ -849,6 +916,12 @@ Recorder::is_paused()
return pimpl_->is_paused();
}

bool
Recorder::is_discovery_stopped()
{
return pimpl_->is_discovery_stopped();
}

std::unordered_map<std::string, std::string>
Recorder::get_requested_or_available_topics()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn

void close() override
{
topics_.clear();
writer_close_called_ = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_interfaces/srv/is_discovery_stopped.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/record.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"
#include "rosbag2_interfaces/srv/start_discovery.hpp"
#include "rosbag2_interfaces/srv/stop.hpp"
#include "rosbag2_interfaces/srv/stop_discovery.hpp"
#include "rosbag2_transport/recorder.hpp"

#include "rosbag2_test_common/publication_manager.hpp"
Expand All @@ -39,11 +44,16 @@ using namespace ::testing; // NOLINT
class RecordSrvsTest : public RecordIntegrationTestFixture
{
public:
using IsDiscoveryStopped = rosbag2_interfaces::srv::IsDiscoveryStopped;
using IsPaused = rosbag2_interfaces::srv::IsPaused;
using Pause = rosbag2_interfaces::srv::Pause;
using Record = rosbag2_interfaces::srv::Record;
using Resume = rosbag2_interfaces::srv::Resume;
using Snapshot = rosbag2_interfaces::srv::Snapshot;
using SplitBagfile = rosbag2_interfaces::srv::SplitBagfile;
using StartDiscovery = rosbag2_interfaces::srv::StartDiscovery;
using Stop = rosbag2_interfaces::srv::Stop;
using StopDiscovery = rosbag2_interfaces::srv::StopDiscovery;

explicit RecordSrvsTest(bool snapshot_mode = false)
: RecordIntegrationTestFixture(),
Expand Down Expand Up @@ -80,11 +90,17 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
pub_manager.setup_publisher(test_topic_, string_message, 10);

const std::string ns = "/" + recorder_name_;
cli_is_discovery_stopped_ = client_node_->create_client<IsDiscoveryStopped>(ns +
"/is_discovery_stopped");
cli_is_paused_ = client_node_->create_client<IsPaused>(ns + "/is_paused");
cli_pause_ = client_node_->create_client<Pause>(ns + "/pause");
cli_record_ = client_node_->create_client<Record>(ns + "/record");
cli_resume_ = client_node_->create_client<Resume>(ns + "/resume");
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot");
cli_split_bagfile_ = client_node_->create_client<SplitBagfile>(ns + "/split_bagfile");
cli_start_discovery_ = client_node_->create_client<StartDiscovery>(ns + "/start_discovery");
cli_stop_ = client_node_->create_client<Stop>(ns + "/stop");
cli_stop_discovery_ = client_node_->create_client<StopDiscovery>(ns + "/stop_discovery");
exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();

exec_->add_node(recorder_);
Expand Down Expand Up @@ -141,11 +157,16 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

// Service clients
rclcpp::Node::SharedPtr client_node_;
rclcpp::Client<IsDiscoveryStopped>::SharedPtr cli_is_discovery_stopped_;
rclcpp::Client<IsPaused>::SharedPtr cli_is_paused_;
rclcpp::Client<Pause>::SharedPtr cli_pause_;
rclcpp::Client<Record>::SharedPtr cli_record_;
rclcpp::Client<Resume>::SharedPtr cli_resume_;
rclcpp::Client<Snapshot>::SharedPtr cli_snapshot_;
rclcpp::Client<SplitBagfile>::SharedPtr cli_split_bagfile_;
rclcpp::Client<StartDiscovery>::SharedPtr cli_start_discovery_;
rclcpp::Client<Stop>::SharedPtr cli_stop_;
rclcpp::Client<StopDiscovery>::SharedPtr cli_stop_discovery_;

bool snapshot_mode_;
};
Expand Down Expand Up @@ -226,3 +247,37 @@ TEST_F(RecordSrvsTest, pause_resume)
is_paused_response = successful_service_request<IsPaused>(cli_is_paused_);
EXPECT_FALSE(is_paused_response->paused);
}

TEST_F(RecordSrvsTest, stop_start_discovery)
{
EXPECT_FALSE(recorder_->is_discovery_stopped());
auto is_discovery_stopped_response =
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_);
EXPECT_FALSE(is_discovery_stopped_response->stopped);

successful_service_request<StopDiscovery>(cli_stop_discovery_);
EXPECT_TRUE(recorder_->is_discovery_stopped());
is_discovery_stopped_response =
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_);
EXPECT_TRUE(is_discovery_stopped_response->stopped);

successful_service_request<StartDiscovery>(cli_start_discovery_);
EXPECT_FALSE(recorder_->is_discovery_stopped());
is_discovery_stopped_response =
successful_service_request<IsDiscoveryStopped>(cli_is_discovery_stopped_);
EXPECT_FALSE(is_discovery_stopped_response->stopped);
}

TEST_F(RecordSrvsTest, record_stop)
{
auto & writer = recorder_->get_writer_handle();
auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle());

EXPECT_FALSE(mock_writer.closed_was_called());

successful_service_request<Stop>(cli_stop_);
EXPECT_TRUE(mock_writer.closed_was_called());

successful_service_request<Record>(cli_record_);
EXPECT_FALSE(mock_writer.closed_was_called());
}
Comment on lines +271 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add recorder start after the stop and then again stop to make sure that we can start again after the stop

Loading