Skip to content

Commit

Permalink
Use converters when recording a bag file (ros2#57)
Browse files Browse the repository at this point in the history
* ros2GH-118 Make rosbag2::Writer use converters

- Use converters in Writer::write() when input rmw serialization format is different from desired storage serialization format
- Add new field in rosbag2::StorageOptions to keep track of the rmw format given by the user to store the message in

* ros2GH-118 Add --encoding option to ros2 bag record

* ros2GH-118 Associate to each topic its rmw_serialization_format

- Add 'serialization_format' field to TopicMetadata
- Add 'serialization_forat' column in 'topics' table in sqlite storage
- Remove 'storage_format' from BagMetadata and use the TopicMetadata field directly, instead
- the field 'rmw_serialization_format' has been moved from rosbag2::StorageOptions to rosbag2_transport::RecordOptions, because it's a topic property rather than a storage one.
- Currently all topics in a bag file must have the same serialization format
- The tests have been updated accordingly

* ros2GH-118 Fix tests after rebase

* ros2GH-118 Fix MockMetadataIO and use it in test_writer

* ros2GH-118 Fix Windows build and minor refactoring

* ros2GH-118 Add test for writer to check that error is thrown if converter plugin does not exist

* ros2GH-118 Add test to check that metadat_io_ writes metadata file in writer's destructor

* ros2GH-118 Build Converter before opening the database in Writer::open()

- This assures that if one of the converter plugins does not exist, the database is not created

* ros2GH-118 Add end-to-end tests to check graceful failure if converter plugins do not exists

- Both a test for record and play has been added

* ros2GH-118 Rename 'encoding' CLI option to 'serialization_format'

* ros2GH-127 Write serialization format in database also when it's not specified at CLI level

- Tests to check that the serialization format is written in the database have also been added.

* ros2GH-17 Add leak sanitizer to test

- one of the main test goals can only be ssen by valgrind or sanitizers
- enable leak sanitizer for gcc builds only (for now)

* ros2GH-137: Fix cdr converter plugin

- update pluginlib descriptions file after several renames
- fix export of missing includes folder

* ros2GH-137 Add integration test for cdr converter

* ros2GH-137 Fix superfluous printf

* ros2GH-137 It suffices to have only one converter test

* ros2GH-137 Minor refactoring for better readability of test

N.B. This exposes an pre-existing memory leak (not fixed here).

* ros2GH-137 Fix memory leak of topic_name

- topic_name member needs to be freed
- provide a setter for convenience
- Directly assigning a string literal in the test is not sufficient as
  this would be static memory that does not need to be freed.

* ros2GH-17 Allow disabling the usage of sanitizers

This allows manual usage of valgrind.

* ros2GH-17 Fix renaming after rebase

* ros2GH-17 Small cleanups (addressing review comments)
  • Loading branch information
botteroa-si authored and Karsten1987 committed Nov 29, 2018
1 parent 28ffe9f commit 30c4733
Show file tree
Hide file tree
Showing 63 changed files with 625 additions and 161 deletions.
16 changes: 14 additions & 2 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-s', '--storage', default='sqlite3',
help='storage identifier to be used, defaults to "sqlite3"')
parser.add_argument(
'-f', '--serialization-format', default='',
help='rmw serialization format in which the messages are saved, defaults to the'
' rmw currently in use')

def create_bag_directory(self, uri):
try:
Expand All @@ -61,9 +65,17 @@ def main(self, *, args): # noqa: D102
self.create_bag_directory(uri)

if args.all:
rosbag2_transport_py.record(uri=uri, storage_id=args.storage, all=True)
rosbag2_transport_py.record(
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
all=True)
elif args.topics and len(args.topics) > 0:
rosbag2_transport_py.record(uri=uri, storage_id=args.storage, topics=args.topics)
rosbag2_transport_py.record(
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
topics=args.topics)
else:
self._subparser.print_help()

Expand Down
33 changes: 30 additions & 3 deletions rosbag2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic -Werror)
endif()

option(DISABLE_SANITIZERS "disables the use of gcc saniztizers")

find_package(ament_cmake REQUIRED)
find_package(ament_index_cpp REQUIRED)
find_package(pluginlib REQUIRED)
Expand Down Expand Up @@ -119,11 +121,36 @@ if(BUILD_TESTING)
target_link_libraries(test_sequential_reader rosbag2)
endif()

# If compiling with gcc, run this test with sanitizers enabled
ament_add_gmock(test_ros2_message
test/rosbag2/types/test_ros2_message.cpp)
test/rosbag2/types/test_ros2_message.cpp
src/rosbag2/typesupport_helpers.cpp
src/rosbag2/types/ros2_message.cpp)
if(TARGET test_ros2_message)
target_link_libraries(test_ros2_message rosbag2)
ament_target_dependencies(test_ros2_message test_msgs)
target_compile_definitions(test_ros2_message PRIVATE "ROSBAG2_BUILDING_DLL")
if(NOT DISABLE_SANITIZERS)
target_compile_options(test_ros2_message PUBLIC $<$<CXX_COMPILER_ID:GNU>:-fsanitize=leak>)
target_link_libraries(test_ros2_message $<$<CXX_COMPILER_ID:GNU>:-fsanitize=leak>)
endif()
target_include_directories(test_ros2_message
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(test_ros2_message
ament_index_cpp
Poco
rcutils
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
rosidl_typesupport_cpp
test_msgs)
endif()

ament_add_gmock(test_writer
test/rosbag2/test_writer.cpp)
if(TARGET test_writer)
target_link_libraries(test_writer rosbag2)
endif()
endif()

Expand Down
14 changes: 12 additions & 2 deletions rosbag2/include/rosbag2/converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@

namespace rosbag2
{

// Convenience struct to keep both type supports (rmw and introspection) together.
// Only used internally.
struct ConverterTypeSupport
{
const rosidl_message_type_support_t * cpp_type_support;
const rosidl_message_type_support_t * introspection_type_support;
};

class ROSBAG2_PUBLIC Converter
{
public:
explicit
Converter(
const std::string & input_format,
const std::string & output_format,
const std::vector<TopicMetadata> & topics_and_types,
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory =
std::make_shared<SerializationFormatConverterFactory>());

Expand All @@ -60,11 +68,13 @@ class ROSBAG2_PUBLIC Converter
std::shared_ptr<SerializedBagMessage>
convert(std::shared_ptr<const SerializedBagMessage> message);

void add_topic(const std::string & topic, const std::string & type);

private:
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
std::unique_ptr<SerializationFormatConverterInterface> input_converter_;
std::unique_ptr<SerializationFormatConverterInterface> output_converter_;
std::map<std::string, std::string> topics_and_types_;
std::map<std::string, ConverterTypeSupport> topics_and_types_;
};

} // namespace rosbag2
Expand Down
5 changes: 4 additions & 1 deletion rosbag2/include/rosbag2/types/ros2_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
typedef struct rosbag2_ros2_message_t
{
void * message;
const char * topic_name;
char * topic_name;
rcutils_time_point_value_t time_stamp;
rcutils_allocator_t allocator;
} rosbag2_ros2_message_t;
Expand All @@ -39,6 +39,9 @@ std::shared_ptr<rosbag2_ros2_message_t>
allocate_ros2_message(
const rosidl_message_type_support_t * introspection_ts, const rcutils_allocator_t * allocator);

ROSBAG2_PUBLIC
void ros2_message_set_topic_name(rosbag2_ros2_message_t * msg, const char * topic_name);

ROSBAG2_PUBLIC
void allocate_internal_types(
void * msg, const rosidl_typesupport_introspection_cpp::MessageMembers * members);
Expand Down
24 changes: 20 additions & 4 deletions rosbag2/include/rosbag2/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include <memory>
#include <string>

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"
#include "rosbag2/converter.hpp"
#include "rosbag2/serialization_format_converter_factory.hpp"
#include "rosbag2/storage_options.hpp"
#include "rosbag2/types.hpp"
#include "rosbag2/visibility_control.hpp"
Expand All @@ -45,17 +48,27 @@ class ROSBAG2_PUBLIC Writer
public:
explicit
Writer(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory =
std::make_unique<rosbag2_storage::StorageFactory>());
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory =
std::make_unique<rosbag2_storage::StorageFactory>(),
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory =
std::make_shared<SerializationFormatConverterFactory>(),
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io =
std::make_unique<rosbag2_storage::MetadataIo>());

virtual ~Writer();

/**
* Opens a new bagfile and prepare it for writing messages. The bagfile must not exist.
* This must be called before any other function is used.
*
* \param options Options to configure the storage
* \param input_serialization_format The serialization format of the messages to write
* \param output_serialization_format The serialization format in which the messages must be saved
*/
virtual void open(const StorageOptions & options);
virtual void open(
const StorageOptions & options,
const std::string & input_serialization_format,
const std::string & output_serialization_format);

/**
* Create a new topic in the underlying storage. Needs to be called for every topic used within
Expand All @@ -76,8 +89,11 @@ class ROSBAG2_PUBLIC Writer

private:
std::string uri_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_;
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_;
std::unique_ptr<Converter> converter_;
};

} // namespace rosbag2
Expand Down
19 changes: 12 additions & 7 deletions rosbag2/src/rosbag2/converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ namespace rosbag2
Converter::Converter(
const std::string & input_format,
const std::string & output_format,
const std::vector<TopicMetadata> & topics_and_types,
std::shared_ptr<rosbag2::SerializationFormatConverterFactoryInterface> converter_factory)
: converter_factory_(converter_factory)
{
for (const auto & topic_with_type : topics_and_types) {
topics_and_types_.insert({topic_with_type.name, topic_with_type.type});
}
input_converter_ = converter_factory_->load_converter(input_format);
output_converter_ = converter_factory_->load_converter(output_format);
if (!input_converter_) {
Expand All @@ -58,9 +54,8 @@ Converter::~Converter()
std::shared_ptr<SerializedBagMessage> Converter::convert(
std::shared_ptr<const rosbag2::SerializedBagMessage> message)
{
auto ts = get_typesupport(topics_and_types_[message->topic_name], "rosidl_typesupport_cpp");
auto introspection_ts =
get_typesupport(topics_and_types_[message->topic_name], "rosidl_typesupport_introspection_cpp");
auto ts = topics_and_types_.at(message->topic_name).cpp_type_support;
auto introspection_ts = topics_and_types_.at(message->topic_name).introspection_type_support;
auto allocator = rcutils_get_default_allocator();
std::shared_ptr<rosbag2_ros2_message_t> allocated_ros_message =
allocate_ros2_message(introspection_ts, &allocator);
Expand All @@ -72,4 +67,14 @@ std::shared_ptr<SerializedBagMessage> Converter::convert(
return output_message;
}

void Converter::add_topic(const std::string & topic, const std::string & type)
{
ConverterTypeSupport type_support;
type_support.cpp_type_support = get_typesupport(type, "rosidl_typesupport_cpp");
type_support.introspection_type_support =
get_typesupport(type, "rosidl_typesupport_introspection_cpp");

topics_and_types_.insert({topic, type_support});
}

} // namespace rosbag2
22 changes: 20 additions & 2 deletions rosbag2/src/rosbag2/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <utility>
#include <vector>

#include "rosbag2/info.hpp"

namespace rosbag2
{

Expand All @@ -41,13 +43,29 @@ SequentialReader::open(const StorageOptions & options, const std::string & rmw_s
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
auto storage_serialization_format = storage_->get_metadata().serialization_format;
auto topics = storage_->get_metadata().topics_with_message_count;
if (topics.empty()) {
return;
}

// Currently a bag file can only be played if all topics have the same serialization format.
auto storage_serialization_format = topics[0].topic_with_type.serialization_format;
for (const auto & topic : topics) {
if (topic.topic_with_type.serialization_format != storage_serialization_format) {
throw std::runtime_error("Topics with different rwm serialization format have been found. "
"All topics must have the same serialization format.");
}
}

if (rmw_serialization_format != storage_serialization_format) {
converter_ = std::make_unique<Converter>(
storage_serialization_format,
rmw_serialization_format,
storage_->get_all_topics_and_types(),
converter_factory_);
auto topics = storage_->get_all_topics_and_types();
for (const auto & topic_with_type : topics) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion rosbag2/src/rosbag2/types/ros2_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

#include "rosbag2/types/ros2_message.hpp"

#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rosidl_typesupport_introspection_cpp/field_types.hpp"
#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp"
Expand All @@ -34,6 +34,7 @@ allocate_ros2_message(
introspection_ts->data);
auto raw_ros2_message = new rosbag2_ros2_message_t();
raw_ros2_message->allocator = *allocator;
raw_ros2_message->topic_name = nullptr;
raw_ros2_message->message = raw_ros2_message->allocator.zero_allocate(
1, intro_ts_members->size_of_, raw_ros2_message->allocator.state);
// TODO(Martin-Idel-SI): Use custom allocator to make sure all memory is obtained that way
Expand All @@ -45,12 +46,22 @@ allocate_ros2_message(
return std::shared_ptr<rosbag2_ros2_message_t>(raw_ros2_message, deleter);
}

void ros2_message_set_topic_name(rosbag2_ros2_message_t * msg, const char * topic_name)
{
if (msg->topic_name) {
msg->allocator.deallocate(msg->topic_name, msg->allocator.state);
msg->topic_name = nullptr;
}
msg->topic_name = rcutils_strdup(topic_name, msg->allocator);
}

void deallocate_ros2_message(
rosbag2_ros2_message_t * msg,
const rosidl_typesupport_introspection_cpp::MessageMembers * members)
{
deallocate_internal_types(msg->message, members);
msg->allocator.deallocate(msg->message, msg->allocator.state);
msg->allocator.deallocate(msg->topic_name, msg->allocator.state);
delete msg;
}

Expand Down
37 changes: 28 additions & 9 deletions rosbag2/src/rosbag2/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,45 @@
#include <string>
#include <utility>

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2/info.hpp"
#include "rosbag2/storage_options.hpp"

namespace rosbag2
{

Writer::Writer(std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory)
: factory_(std::move(factory))
Writer::Writer(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_(std::move(storage_factory)),
converter_factory_(std::move(converter_factory)),
metadata_io_(std::move(metadata_io)),
converter_(nullptr)
{}

Writer::~Writer()
{
if (!uri_.empty()) {
rosbag2_storage::MetadataIo metadata_io;
metadata_io.write_metadata(uri_, storage_->get_metadata());
metadata_io_->write_metadata(uri_, storage_->get_metadata());
}

storage_.reset(); // Necessary to ensure that the writer is destroyed before the factory
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
}

void Writer::open(const StorageOptions & options)
void Writer::open(
const StorageOptions & options,
const std::string & input_serialization_format,
const std::string & output_serialization_format)
{
storage_ = factory_->open_read_write(options.uri, options.storage_id);
if (output_serialization_format != input_serialization_format) {
converter_ = std::make_unique<Converter>(
input_serialization_format,
output_serialization_format,
converter_factory_);
}

storage_ = storage_factory_->open_read_write(options.uri, options.storage_id);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
Expand All @@ -54,6 +69,10 @@ void Writer::create_topic(const TopicMetadata & topic_with_type)
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

if (converter_) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}

storage_->create_topic(topic_with_type);
}

Expand All @@ -63,7 +82,7 @@ void Writer::write(std::shared_ptr<SerializedBagMessage> message)
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

storage_->write(message);
storage_->write(converter_ ? converter_->convert(message) : message);
}

} // namespace rosbag2
Loading

0 comments on commit 30c4733

Please sign in to comment.