diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 87072ada89..357a825131 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -182,6 +182,8 @@ function(create_tests_for_rmw_implementation) endfunction() if(BUILD_TESTING) + add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources") + find_package(ament_cmake_gmock REQUIRED) find_package(ament_index_cpp REQUIRED) find_package(ament_lint_auto REQUIRED) @@ -201,6 +203,12 @@ if(BUILD_TESTING) target_include_directories(test_topic_filter PRIVATE $) target_link_libraries(test_topic_filter rosbag2_transport) + + ament_add_gmock(test_rewrite + test/rosbag2_transport/test_rewrite.cpp) + target_include_directories(test_rewrite PRIVATE include) + target_link_libraries(test_rewrite ${PROJECT_NAME}) + ament_target_dependencies(test_rewrite keyboard_handler rcpputils rosbag2_cpp test_msgs) endif() ament_package() diff --git a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp index ad8e246ac9..86f2fc5fdf 100644 --- a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp +++ b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp @@ -81,6 +81,7 @@ setup_topic_filtering( std::unordered_map> filtered_outputs; std::map> input_topics; std::unordered_map input_topics_qos_profiles; + std::unordered_map input_topics_serialization_format; for (const auto & input_bag : input_bags) { auto bag_topics_and_types = input_bag->get_all_topics_and_types(); @@ -88,6 +89,7 @@ setup_topic_filtering( const std::string & topic_name = topic_metadata.name; input_topics.try_emplace(topic_name); input_topics[topic_name].push_back(topic_metadata.type); + input_topics_serialization_format[topic_name] = topic_metadata.serialization_format; // Gather all offered qos profiles from all inputs input_topics_qos_profiles.try_emplace(topic_name); @@ -108,7 +110,14 @@ setup_topic_filtering( rosbag2_storage::TopicMetadata topic_metadata; topic_metadata.name = topic_name; topic_metadata.type = topic_type; - topic_metadata.serialization_format = record_options.rmw_serialization_format; + + // Take source serialization format for the topic if output format is unspecified + if (record_options.rmw_serialization_format.empty()) { + topic_metadata.serialization_format = input_topics_serialization_format[topic_name]; + } else { + topic_metadata.serialization_format = record_options.rmw_serialization_format; + } + std::stringstream qos_profiles; qos_profiles << input_topics_qos_profiles[topic_name]; topic_metadata.offered_qos_profiles = qos_profiles.str(); diff --git a/rosbag2_transport/test/resources/rewriter_a/metadata.yaml b/rosbag2_transport/test/resources/rewriter_a/metadata.yaml new file mode 100644 index 0000000000..3c0b8ae7c8 --- /dev/null +++ b/rosbag2_transport/test/resources/rewriter_a/metadata.yaml @@ -0,0 +1,32 @@ +rosbag2_bagfile_information: + version: 5 + storage_identifier: sqlite3 + duration: + nanoseconds: 100000000 + starting_time: + nanoseconds_since_epoch: 0 + message_count: 150 + topics_with_message_count: + - topic_metadata: + name: a_empty + type: test_msgs/msg/Empty + serialization_format: cdr + offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false\n- history: 1\n depth: 2\n reliability: 2\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false" + message_count: 100 + - topic_metadata: + name: b_basictypes + type: test_msgs/msg/BasicTypes + serialization_format: cdr + offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false" + message_count: 50 + compression_format: "" + compression_mode: "" + relative_file_paths: + - rewriter_a_0.db3 + files: + - path: rewriter_a_0.db3 + starting_time: + nanoseconds_since_epoch: 0 + duration: + nanoseconds: 100000000 + message_count: 150 diff --git a/rosbag2_transport/test/resources/rewriter_a/rewriter_a_0.db3 b/rosbag2_transport/test/resources/rewriter_a/rewriter_a_0.db3 new file mode 100644 index 0000000000..baaa015852 Binary files /dev/null and b/rosbag2_transport/test/resources/rewriter_a/rewriter_a_0.db3 differ diff --git a/rosbag2_transport/test/resources/rewriter_b/metadata.yaml b/rosbag2_transport/test/resources/rewriter_b/metadata.yaml new file mode 100644 index 0000000000..333882eb8a --- /dev/null +++ b/rosbag2_transport/test/resources/rewriter_b/metadata.yaml @@ -0,0 +1,32 @@ +rosbag2_bagfile_information: + version: 5 + storage_identifier: sqlite3 + duration: + nanoseconds: 222000000 + starting_time: + nanoseconds_since_epoch: 0 + message_count: 75 + topics_with_message_count: + - topic_metadata: + name: c_strings + type: test_msgs/msg/Strings + serialization_format: cdr + offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false" + message_count: 50 + - topic_metadata: + name: a_empty + type: test_msgs/msg/Empty + serialization_format: cdr + offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false" + message_count: 25 + compression_format: "" + compression_mode: "" + relative_file_paths: + - rewriter_b_0.db3 + files: + - path: rewriter_b_0.db3 + starting_time: + nanoseconds_since_epoch: 0 + duration: + nanoseconds: 222000000 + message_count: 75 diff --git a/rosbag2_transport/test/resources/rewriter_b/rewriter_b_0.db3 b/rosbag2_transport/test/resources/rewriter_b/rewriter_b_0.db3 new file mode 100644 index 0000000000..2fedfc6a58 Binary files /dev/null and b/rosbag2_transport/test/resources/rewriter_b/rewriter_b_0.db3 differ diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp index 532e86c460..c14f9fe44d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp @@ -41,9 +41,8 @@ class RosBag2PlaySeekTestFixture : public RosBag2PlayTestFixture topic_types_ = std::vector{ {"topic1", "test_msgs/BasicTypes", rmw_get_serialization_format(), ""}}; - const rcpputils::fs::path current_file_path{__FILE__}; - const rcpputils::fs::path base = current_file_path.parent_path().parent_path(); - const rcpputils::fs::path bag_path = base / "resources" / "test_bag_for_seek"; + const rcpputils::fs::path base{_SRC_RESOURCES_DIR_PATH}; + const rcpputils::fs::path bag_path = base / "test_bag_for_seek"; storage_options_ = rosbag2_storage::StorageOptions({bag_path.string(), "sqlite3", 0, 0, 0}); play_options_.read_ahead_queue_size = 2; diff --git a/rosbag2_transport/test/rosbag2_transport/test_rewrite.cpp b/rosbag2_transport/test/rosbag2_transport/test_rewrite.cpp new file mode 100644 index 0000000000..2e8b96b6ac --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_rewrite.cpp @@ -0,0 +1,191 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rcpputils/filesystem_helper.hpp" +#include "rosbag2_transport/bag_rewrite.hpp" +#include "rosbag2_transport/reader_writer_factory.hpp" + +using namespace ::testing; // NOLINT + + +/* +Builtin knowledge about the bags under test: +rewriter_a: + - a_empty: + - test_msgs/Empty + - 100 messages + - 2 offered QoS profiles + - b_basictypes: + - test_msgs/BasicTypes + - 50 messages + - 1 offered QoS profile +rewriter_b: + - a_empty: + - test_msgs/Empty + - 25 messages + - 1 offered Qos Profile + - c_strings: + - test_msgs/Strings + - 50 messages + - 1 offered QoS Profile +*/ +class TestRewrite : public Test +{ +public: + TestRewrite() + : output_dir_(rcpputils::fs::create_temp_directory("test_bag_rewrite")) + {} + + void use_input_a() + { + rosbag2_storage::StorageOptions storage; + storage.uri = (bags_path_ / "rewriter_a").string(); + input_bags_.push_back(storage); + } + + void use_input_b() + { + rosbag2_storage::StorageOptions storage; + storage.uri = (bags_path_ / "rewriter_b").string(); + input_bags_.push_back(storage); + } + + ~TestRewrite() + { + // rcpputils::fs::remove_all(output_dir_); + } + + const rcpputils::fs::path bags_path_{_SRC_RESOURCES_DIR_PATH}; + const rcpputils::fs::path output_dir_; + std::vector input_bags_; + std::vector> + output_bags_; +}; + +TEST_F(TestRewrite, test_noop_rewrite) { + use_input_a(); + + rosbag2_storage::StorageOptions output_storage; + output_storage.uri = (output_dir_ / "unchanged").string(); + output_storage.storage_id = "sqlite3"; + rosbag2_transport::RecordOptions output_record; + output_record.all = true; + output_bags_.push_back({output_storage, output_record}); + + rosbag2_transport::bag_rewrite(input_bags_, output_bags_); + + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(output_storage); + reader->open(output_storage); + const auto metadata = reader->get_metadata(); + EXPECT_EQ(metadata.message_count, 100u + 50u); + EXPECT_THAT(metadata.topics_with_message_count, SizeIs(2)); + EXPECT_EQ(metadata.topics_with_message_count[0].topic_metadata.serialization_format, "cdr"); +} + +TEST_F(TestRewrite, test_merge) { + use_input_a(); + use_input_b(); + + rosbag2_storage::StorageOptions output_storage; + output_storage.uri = (output_dir_ / "merged").string(); + output_storage.storage_id = "sqlite3"; + rosbag2_transport::RecordOptions output_record; + output_record.all = true; + output_bags_.push_back({output_storage, output_record}); + + rosbag2_transport::bag_rewrite(input_bags_, output_bags_); + + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(output_storage); + reader->open(output_storage); + const auto metadata = reader->get_metadata(); + EXPECT_EQ(metadata.message_count, 100u + 50u + 50u + 25u); + + // Check that all 3 topics are present (a_empty merged from both) + EXPECT_THAT(metadata.topics_with_message_count, SizeIs(3)); + + // Check that offered QoS profiles got concatenated + for (const auto & topic_info : metadata.topics_with_message_count) { + const auto topic = topic_info.topic_metadata; + if (topic.name == "a_empty") { + YAML::Node qos_node = YAML::Load(topic.offered_qos_profiles); + EXPECT_TRUE(qos_node.IsSequence()); + EXPECT_EQ(qos_node.size(), 3u); + } + } +} + +TEST_F(TestRewrite, test_filter_split) { + use_input_a(); + + { + rosbag2_storage::StorageOptions storage_opts; + storage_opts.uri = (output_dir_ / "split1").string(); + storage_opts.storage_id = "sqlite3"; + rosbag2_transport::RecordOptions rec_opts; + rec_opts.all = true; + rec_opts.exclude = ".*basic.*"; + output_bags_.push_back({storage_opts, rec_opts}); + } + { + rosbag2_storage::StorageOptions storage_opts; + storage_opts.uri = (output_dir_ / "split2").string(); + storage_opts.storage_id = "sqlite3"; + rosbag2_transport::RecordOptions rec_opts; + rec_opts.all = false; + rec_opts.topics = {"b_basictypes"}; + output_bags_.push_back({storage_opts, rec_opts}); + } + + rosbag2_transport::bag_rewrite(input_bags_, output_bags_); + + { + auto opts = output_bags_[0].first; + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(opts); + reader->open(opts); + const auto metadata = reader->get_metadata(); + EXPECT_THAT(metadata.topics_with_message_count, SizeIs(1)); + EXPECT_EQ(metadata.topics_with_message_count[0].topic_metadata.name, "a_empty"); + EXPECT_EQ(metadata.message_count, 100u); + } + { + auto opts = output_bags_[1].first; + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(opts); + reader->open(opts); + const auto metadata = reader->get_metadata(); + EXPECT_THAT(metadata.topics_with_message_count, SizeIs(1)); + EXPECT_EQ(metadata.topics_with_message_count[0].topic_metadata.name, "b_basictypes"); + EXPECT_EQ(metadata.message_count, 50u); + } +} + +TEST_F(TestRewrite, test_compress) { + use_input_a(); + + rosbag2_storage::StorageOptions output_storage; + output_storage.uri = (output_dir_ / "compressed").string(); + output_storage.storage_id = "sqlite3"; + rosbag2_transport::RecordOptions output_record; + output_record.all = true; + output_record.compression_mode = "file"; + output_record.compression_format = "zstd"; + output_bags_.push_back({output_storage, output_record}); + + rosbag2_transport::bag_rewrite(input_bags_, output_bags_); + + auto compressed_bagfile = output_dir_ / "compressed" / "compressed_0.db3.zstd"; + EXPECT_TRUE(compressed_bagfile.exists()); + EXPECT_TRUE(compressed_bagfile.is_regular_file()); +}