Skip to content

Commit

Permalink
Rewrite make_callback as a private method, using bind instead of lamb…
Browse files Browse the repository at this point in the history
…da, to reuse in take_snapshot

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Sep 14, 2021
1 parent 55beff6 commit bd852fd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 42 deletions.
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
virtual std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);

private:
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
};

} // namespace writers
Expand Down
70 changes: 28 additions & 42 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_cpp/info.hpp"
#include "rosbag2_cpp/logging.hpp"

#include "rosbag2_storage/storage_options.hpp"

Expand All @@ -42,24 +43,6 @@ std::string strip_parent_path(const std::string & relative_path)
{
return rcpputils::fs::path(relative_path).filename().string();
}

rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t make_callback(
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_interface,
std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_info_map,
std::mutex & topics_mutex)
{
return [callback_interface = storage_interface, &topics_info_map, &topics_mutex](
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs) {
callback_interface->write(msgs);
for (const auto & msg : msgs) {
// count messages as successfully written
std::lock_guard<std::mutex> lock(topics_mutex);
if (topics_info_map.find(msg->topic_name) != topics_info_map.end()) {
topics_info_map[msg->topic_name].message_count++;
}
}
};
}
} // namespace

SequentialWriter::SequentialWriter(
Expand Down Expand Up @@ -139,10 +122,7 @@ void SequentialWriter::open(
storage_options.max_cache_size);
cache_consumer_ = std::make_unique<rosbag2_cpp::cache::CacheConsumer>(
message_cache_,
make_callback(
storage_,
topics_names_to_info_,
topics_info_mutex_));
std::bind(&SequentialWriter::write_messages, this, std::placeholders::_1));
} else if (storage_options_.snapshot_mode) {
if (storage_options_.max_cache_size == 0) {
throw std::runtime_error(
Expand Down Expand Up @@ -173,22 +153,6 @@ void SequentialWriter::close()
storage_factory_.reset();
}

bool SequentialWriter::take_snapshot()
{
std::lock_guard<std::mutex> lock(message_cache_->consuming_buffer_mutex_);
message_cache_->swap_buffers();
auto msg_vector = message_cache_->consumer_buffer()->data();

storage_->write(msg_vector);
for (const auto & msg : msg_vector) {
std::lock_guard<std::mutex> lock(topics_info_mutex_);
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
topics_names_to_info_[msg->topic_name].message_count++;
}
}
return true;
}

void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (topics_names_to_info_.find(topic_with_type.name) !=
Expand Down Expand Up @@ -290,10 +254,7 @@ void SequentialWriter::switch_to_next_storage()
// Set new storage in buffer layer and restart consumer thread
if (use_cache_) {
cache_consumer_->change_consume_callback(
make_callback(
storage_,
topics_names_to_info_,
topics_info_mutex_));
std::bind(&SequentialWriter::write_messages, this, std::placeholders::_1));
}
}

Expand Down Expand Up @@ -347,6 +308,19 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
}
}

bool SequentialWriter::take_snapshot()
{
if (!storage_options_.snapshot_mode) {
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled");
return false;
}
std::lock_guard<std::mutex> lock(message_cache_->consuming_buffer_mutex_);
message_cache_->swap_buffers();
auto msg_vector = message_cache_->consumer_buffer()->data();
write_messages(msg_vector);
return true;
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
Expand Down Expand Up @@ -402,5 +376,17 @@ void SequentialWriter::finalize_metadata()
}
}

void SequentialWriter::write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages)
{
storage_->write(messages);
for (const auto & msg : messages) {
std::lock_guard<std::mutex> lock(topics_info_mutex_);
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
topics_names_to_info_[msg->topic_name].message_count++;
}
}
}

} // namespace writers
} // namespace rosbag2_cpp

0 comments on commit bd852fd

Please sign in to comment.