Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bag rewriter (C++) #920

Merged
merged 4 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ find_package(shared_queues_vendor REQUIRED)
find_package(yaml_cpp_vendor REQUIRED)

add_library(${PROJECT_NAME} SHARED
src/rosbag2_transport/bag_rewrite.cpp
src/rosbag2_transport/player.cpp
src/rosbag2_transport/qos.cpp
src/rosbag2_transport/reader_writer_factory.cpp
Expand Down Expand Up @@ -186,6 +187,8 @@ function(create_tests_for_rmw_implementation)
endfunction()

if(BUILD_TESTING)
add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources")

find_package(ament_cmake_gmock REQUIRED)
find_package(ament_index_cpp REQUIRED)
find_package(ament_lint_auto REQUIRED)
Expand All @@ -205,6 +208,12 @@ if(BUILD_TESTING)
target_include_directories(test_topic_filter PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
target_link_libraries(test_topic_filter rosbag2_transport)

ament_add_gmock(test_rewrite
test/rosbag2_transport/test_rewrite.cpp)
target_include_directories(test_rewrite PRIVATE include)
target_link_libraries(test_rewrite ${PROJECT_NAME})
ament_target_dependencies(test_rewrite keyboard_handler rcpputils rosbag2_cpp test_msgs)
endif()

ament_package()
47 changes: 47 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/bag_rewrite.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_
#define ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_

#include <memory>
#include <utility>
#include <vector>

#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_transport/record_options.hpp"

namespace rosbag2_transport
{
/// Given one or more existing bags, write out one or more new bags with new settings.
/// This generic feature enables (but is not limited to) the following features:
/// - merge (multiple input bags, one output bag)
/// - split (one input bag, one output bag with some size or duration splitting values)
/// - filter (input bag(s) - output bag(s) accept different topics)
/// - compress
/// - serialization format conversion
///
/// \param input_options vector of settings to create Readers for bags to read messages from
/// \param output_bags - full "recording" configuration of the bag(s) to write messages to
/// Each output bag will be passed messages from every input bag,
/// on topics that pass its filtering settings
void bag_rewrite(
const std::vector<rosbag2_storage::StorageOptions> & input_options,
const std::vector<
std::pair<rosbag2_storage::StorageOptions, rosbag2_transport::RecordOptions>
> & output_options
);
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_
198 changes: 198 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rosbag2_transport/bag_rewrite.hpp"

#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "rosbag2_cpp/reader.hpp"
#include "rosbag2_cpp/writer.hpp"
#include "rosbag2_transport/reader_writer_factory.hpp"

#include "logging.hpp"
#include "topic_filter.hpp"

namespace
{

/// Find the next chronological message from all opened input bags.
/// Updates the next_messages queue as necessary.
/// next_messages is needed because Reader has no "peek" interface, we cannot put a message back.
std::shared_ptr<rosbag2_storage::SerializedBagMessage> get_next(
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> & next_messages)
{
// find message with lowest timestamp
std::shared_ptr<rosbag2_storage::SerializedBagMessage> earliest_msg = nullptr;
size_t earliest_msg_index = -1;
for (size_t i = 0; i < next_messages.size(); i++) {
// refill queue if bag not empty
if (next_messages[i] == nullptr && input_bags[i]->has_next()) {
next_messages[i] = input_bags[i]->read_next();
}

auto & msg = next_messages[i];
if (msg == nullptr) {
continue;
}
if (earliest_msg == nullptr || msg->time_stamp < earliest_msg->time_stamp) {
earliest_msg = msg;
earliest_msg_index = i;
}
}

// clear returned message from queue before returning it, so it can be refilled next time
if (earliest_msg != nullptr) {
next_messages[earliest_msg_index].reset();
}
return earliest_msg;
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
}

/// Discover what topics are in the inputs, filter out topics that can't be processed,
/// create_topic on Writers that will receive topics.
/// Return a map f topic -> vector of which Writers want to receive that topic,
/// based on the RecordOptions.
/// The output vector has bare pointers to the uniquely owned Writers,
/// so this may not outlive the output_bags Writers.
std::unordered_map<std::string, std::vector<rosbag2_cpp::Writer *>>
setup_topic_filtering(
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
const std::vector<
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
> & output_bags)
{
std::unordered_map<std::string, std::vector<rosbag2_cpp::Writer *>> filtered_outputs;
std::map<std::string, std::vector<std::string>> input_topics;
std::unordered_map<std::string, YAML::Node> input_topics_qos_profiles;
std::unordered_map<std::string, std::string> input_topics_serialization_format;

for (const auto & input_bag : input_bags) {
auto bag_topics_and_types = input_bag->get_all_topics_and_types();
for (const auto & topic_metadata : bag_topics_and_types) {
const std::string & topic_name = topic_metadata.name;
input_topics.try_emplace(topic_name);
input_topics[topic_name].push_back(topic_metadata.type);
input_topics_serialization_format[topic_name] = topic_metadata.serialization_format;

// Gather all offered qos profiles from all inputs
input_topics_qos_profiles.try_emplace(topic_name);
YAML::Node & all_offered = input_topics_qos_profiles[topic_name];
YAML::Node offered_qos_profiles = YAML::Load(topic_metadata.offered_qos_profiles);
for (auto qos : offered_qos_profiles) {
all_offered.push_back(qos);
}
}
}

for (const auto & [writer, record_options] : output_bags) {
rosbag2_transport::TopicFilter topic_filter{record_options};
auto filtered_topics_and_types = topic_filter.filter_topics(input_topics);

// Done filtering - set up writer
for (const auto & [topic_name, topic_type] : filtered_topics_and_types) {
rosbag2_storage::TopicMetadata topic_metadata;
topic_metadata.name = topic_name;
topic_metadata.type = topic_type;

// Take source serialization format for the topic if output format is unspecified
if (record_options.rmw_serialization_format.empty()) {
topic_metadata.serialization_format = input_topics_serialization_format[topic_name];
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
} else {
topic_metadata.serialization_format = record_options.rmw_serialization_format;
}

std::stringstream qos_profiles;
qos_profiles << input_topics_qos_profiles[topic_name];
topic_metadata.offered_qos_profiles = qos_profiles.str();
writer->create_topic(topic_metadata);

filtered_outputs.try_emplace(topic_name);
filtered_outputs[topic_name].push_back(writer.get());
}
}

return filtered_outputs;
}

void perform_rewrite(
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
const std::vector<
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
> & output_bags
)
{
if (input_bags.empty() || output_bags.empty()) {
throw std::runtime_error("Must provide at least one input and one output bag to rewrite.");
}

auto topic_outputs = setup_topic_filtering(input_bags, output_bags);

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> next_messages;
next_messages.resize(input_bags.size(), nullptr);

std::shared_ptr<rosbag2_storage::SerializedBagMessage> next_msg;
while (next_msg = get_next(input_bags, next_messages)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduced a warning message into the buildfarm. See nightly_osx_debug#2246.

It's probably unintended usage, would you take a look? @emersonknapp

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended usage - as noted above I wasn't 100% sure that OSX would be happy. Will add the parentheses suggested by Clang ASAP.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto topic_writers = topic_outputs.find(next_msg->topic_name);
if (topic_writers != topic_outputs.end()) {
for (auto writer : topic_writers->second) {
writer->write(next_msg);
}
}
}
}

} // namespace

namespace rosbag2_transport
{
void bag_rewrite(
const std::vector<rosbag2_storage::StorageOptions> & input_options,
const std::vector<
std::pair<rosbag2_storage::StorageOptions, rosbag2_transport::RecordOptions>
> & output_options
)
{
std::vector<std::unique_ptr<rosbag2_cpp::Reader>> input_bags;
std::vector<
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
> output_bags;

for (const auto & storage_options : input_options) {
auto reader = ReaderWriterFactory::make_reader(storage_options);
reader->open(storage_options);
input_bags.push_back(std::move(reader));
}

for (auto & [storage_options, record_options] : output_options) {
// TODO(emersonknapp) - utilize cache to get better performance.
// For now, zero cache allows for synchronous writes which are guaranteed to go through.
// With cache enabled, the buffer could overflow and drop messages in this fast-write loop.
// To enable the cache we will need to implement a mechanism for the writer to take messages
// only when it is able to, which will likely require some new APIs.
auto zero_cache_storage_options = storage_options;
zero_cache_storage_options.max_cache_size = 0u;
auto writer = ReaderWriterFactory::make_writer(record_options);
writer->open(zero_cache_storage_options);
output_bags.push_back(std::make_pair(std::move(writer), record_options));
}

perform_rewrite(input_bags, output_bags);
}
} // namespace rosbag2_transport
32 changes: 32 additions & 0 deletions rosbag2_transport/test/resources/rewriter_a/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
rosbag2_bagfile_information:
version: 5
storage_identifier: sqlite3
duration:
nanoseconds: 100000000
starting_time:
nanoseconds_since_epoch: 0
message_count: 150
topics_with_message_count:
- topic_metadata:
name: a_empty
type: test_msgs/msg/Empty
serialization_format: cdr
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false\n- history: 1\n depth: 2\n reliability: 2\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 100
- topic_metadata:
name: b_basictypes
type: test_msgs/msg/BasicTypes
serialization_format: cdr
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 50
compression_format: ""
compression_mode: ""
relative_file_paths:
- rewriter_a_0.db3
files:
- path: rewriter_a_0.db3
starting_time:
nanoseconds_since_epoch: 0
duration:
nanoseconds: 100000000
message_count: 150
Binary file not shown.
32 changes: 32 additions & 0 deletions rosbag2_transport/test/resources/rewriter_b/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
rosbag2_bagfile_information:
version: 5
storage_identifier: sqlite3
duration:
nanoseconds: 222000000
starting_time:
nanoseconds_since_epoch: 0
message_count: 75
topics_with_message_count:
- topic_metadata:
name: c_strings
type: test_msgs/msg/Strings
serialization_format: cdr
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 50
- topic_metadata:
name: a_empty
type: test_msgs/msg/Empty
serialization_format: cdr
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 25
compression_format: ""
compression_mode: ""
relative_file_paths:
- rewriter_b_0.db3
files:
- path: rewriter_b_0.db3
starting_time:
nanoseconds_since_epoch: 0
duration:
nanoseconds: 222000000
message_count: 75
Binary file not shown.
5 changes: 2 additions & 3 deletions rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ class RosBag2PlaySeekTestFixture : public RosBag2PlayTestFixture
topic_types_ = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", rmw_get_serialization_format(), ""}};

const rcpputils::fs::path current_file_path{__FILE__};
const rcpputils::fs::path base = current_file_path.parent_path().parent_path();
const rcpputils::fs::path bag_path = base / "resources" / "test_bag_for_seek";
const rcpputils::fs::path base{_SRC_RESOURCES_DIR_PATH};
const rcpputils::fs::path bag_path = base / "test_bag_for_seek";

storage_options_ = rosbag2_storage::StorageOptions({bag_path.string(), "sqlite3", 0, 0, 0});
play_options_.read_ahead_queue_size = 2;
Expand Down
Loading