Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use converters when recording a bag file #57

Merged
merged 22 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7214706
GH-118 Make rosbag2::Writer use converters
botteroa-si Oct 29, 2018
770cc4d
GH-118 Add --encoding option to ros2 bag record
botteroa-si Oct 30, 2018
fd39d6e
GH-118 Associate to each topic its rmw_serialization_format
botteroa-si Oct 30, 2018
8977882
GH-118 Fix tests after rebase
botteroa-si Oct 31, 2018
cb6af77
GH-118 Fix MockMetadataIO and use it in test_writer
botteroa-si Oct 31, 2018
afe2a72
GH-118 Fix Windows build and minor refactoring
Martin-Idel-SI Oct 31, 2018
02bf9bd
GH-118 Add test for writer to check that error is thrown if converter…
botteroa-si Oct 31, 2018
dc90aeb
GH-118 Add test to check that metadat_io_ writes metadata file in wri…
botteroa-si Oct 31, 2018
8d2d311
GH-118 Build Converter before opening the database in Writer::open()
botteroa-si Oct 31, 2018
84baffa
GH-118 Add end-to-end tests to check graceful failure if converter pl…
botteroa-si Oct 31, 2018
b469fc7
GH-118 Rename 'encoding' CLI option to 'serialization_format'
botteroa-si Oct 31, 2018
91e7eb8
GH-127 Write serialization format in database also when it's not spec…
botteroa-si Nov 13, 2018
65d7ea3
GH-137: Fix cdr converter plugin
Martin-Idel-SI Nov 21, 2018
2c28e1d
GH-137 Add integration test for cdr converter
Martin-Idel-SI Nov 21, 2018
827bc40
GH-137 Fix superfluous printf
Martin-Idel-SI Nov 22, 2018
605a15f
GH-17 Add leak sanitizer to test
Martin-Idel-SI Nov 21, 2018
11ba701
GH-137 It suffices to have only one converter test
Martin-Idel-SI Nov 22, 2018
1a3cb60
GH-137 Minor refactoring for better readability of test
anhosi Nov 23, 2018
61d425b
GH-137 Fix memory leak of topic_name
anhosi Nov 23, 2018
5a93f1b
GH-17 Allow disabling the usage of sanitizers
anhosi Nov 23, 2018
aa73556
GH-17 Fix renaming after rebase
Martin-Idel-SI Nov 26, 2018
77608b5
GH-17 Small cleanups (addressing review comments)
anhosi Nov 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-s', '--storage', default='sqlite3',
help='storage identifier to be used, defaults to "sqlite3"')
parser.add_argument(
'-f', '--serialization-format', default='',
help='rmw serialization format in which the messages are saved, defaults to the'
' rmw currently in use')

def create_bag_directory(self, uri):
try:
Expand All @@ -61,9 +65,17 @@ def main(self, *, args): # noqa: D102
self.create_bag_directory(uri)

if args.all:
rosbag2_transport_py.record(uri=uri, storage_id=args.storage, all=True)
rosbag2_transport_py.record(
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
all=True)
elif args.topics and len(args.topics) > 0:
rosbag2_transport_py.record(uri=uri, storage_id=args.storage, topics=args.topics)
rosbag2_transport_py.record(
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
topics=args.topics)
else:
self._subparser.print_help()

Expand Down
33 changes: 30 additions & 3 deletions rosbag2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic -Werror)
endif()

option(DISABLE_SANITIZERS "disables the use of gcc saniztizers")

find_package(ament_cmake REQUIRED)
find_package(ament_index_cpp REQUIRED)
find_package(pluginlib REQUIRED)
Expand Down Expand Up @@ -119,11 +121,36 @@ if(BUILD_TESTING)
target_link_libraries(test_sequential_reader rosbag2)
endif()

# If compiling with gcc, run this test with sanitizers enabled
ament_add_gmock(test_ros2_message
test/rosbag2/types/test_ros2_message.cpp)
test/rosbag2/types/test_ros2_message.cpp
src/rosbag2/typesupport_helpers.cpp
src/rosbag2/types/ros2_message.cpp)
if(TARGET test_ros2_message)
target_link_libraries(test_ros2_message rosbag2)
ament_target_dependencies(test_ros2_message test_msgs)
target_compile_definitions(test_ros2_message PRIVATE "ROSBAG2_BUILDING_DLL")
if(NOT DISABLE_SANITIZERS)
target_compile_options(test_ros2_message PUBLIC $<$<CXX_COMPILER_ID:GNU>:-fsanitize=leak>)
target_link_libraries(test_ros2_message $<$<CXX_COMPILER_ID:GNU>:-fsanitize=leak>)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record: in the Jenkins CI jobs which are using Docker the leak sanitizer was never used for the test. See ros-infrastructure/ros_buildfarm#832.

endif()
target_include_directories(test_ros2_message
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(test_ros2_message
ament_index_cpp
Poco
rcutils
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
rosidl_typesupport_cpp
test_msgs)
endif()

ament_add_gmock(test_writer
test/rosbag2/test_writer.cpp)
if(TARGET test_writer)
target_link_libraries(test_writer rosbag2)
endif()
endif()

Expand Down
14 changes: 12 additions & 2 deletions rosbag2/include/rosbag2/converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@

namespace rosbag2
{

// Convenience struct to keep both type supports (rmw and introspection) together.
// Only used internally.
struct ConverterTypeSupport
{
const rosidl_message_type_support_t * cpp_type_support;
const rosidl_message_type_support_t * introspection_type_support;
};

class ROSBAG2_PUBLIC Converter
{
public:
explicit
Converter(
const std::string & input_format,
const std::string & output_format,
const std::vector<TopicMetadata> & topics_and_types,
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory =
std::make_shared<SerializationFormatConverterFactory>());

Expand All @@ -60,11 +68,13 @@ class ROSBAG2_PUBLIC Converter
std::shared_ptr<SerializedBagMessage>
convert(std::shared_ptr<const SerializedBagMessage> message);

void add_topic(const std::string & topic, const std::string & type);

private:
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
std::unique_ptr<SerializationFormatConverterInterface> input_converter_;
std::unique_ptr<SerializationFormatConverterInterface> output_converter_;
std::map<std::string, std::string> topics_and_types_;
std::map<std::string, ConverterTypeSupport> topics_and_types_;
};

} // namespace rosbag2
Expand Down
5 changes: 4 additions & 1 deletion rosbag2/include/rosbag2/types/ros2_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
typedef struct rosbag2_ros2_message_t
{
void * message;
const char * topic_name;
char * topic_name;
rcutils_time_point_value_t time_stamp;
rcutils_allocator_t allocator;
} rosbag2_ros2_message_t;
Expand All @@ -39,6 +39,9 @@ std::shared_ptr<rosbag2_ros2_message_t>
allocate_ros2_message(
const rosidl_message_type_support_t * introspection_ts, const rcutils_allocator_t * allocator);

ROSBAG2_PUBLIC
void ros2_message_set_topic_name(rosbag2_ros2_message_t * msg, const char * topic_name);

ROSBAG2_PUBLIC
void allocate_internal_types(
void * msg, const rosidl_typesupport_introspection_cpp::MessageMembers * members);
Expand Down
24 changes: 20 additions & 4 deletions rosbag2/include/rosbag2/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include <memory>
#include <string>

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"
#include "rosbag2/converter.hpp"
#include "rosbag2/serialization_format_converter_factory.hpp"
#include "rosbag2/storage_options.hpp"
#include "rosbag2/types.hpp"
#include "rosbag2/visibility_control.hpp"
Expand All @@ -45,17 +48,27 @@ class ROSBAG2_PUBLIC Writer
public:
explicit
Writer(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory =
std::make_unique<rosbag2_storage::StorageFactory>());
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory =
std::make_unique<rosbag2_storage::StorageFactory>(),
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory =
std::make_shared<SerializationFormatConverterFactory>(),
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io =
std::make_unique<rosbag2_storage::MetadataIo>());

virtual ~Writer();

/**
* Opens a new bagfile and prepare it for writing messages. The bagfile must not exist.
* This must be called before any other function is used.
*
* \param options Options to configure the storage
* \param input_serialization_format The serialization format of the messages to write
* \param output_serialization_format The serialization format in which the messages must be saved
*/
virtual void open(const StorageOptions & options);
virtual void open(
const StorageOptions & options,
const std::string & input_serialization_format,
const std::string & output_serialization_format);

/**
* Create a new topic in the underlying storage. Needs to be called for every topic used within
Expand All @@ -76,8 +89,11 @@ class ROSBAG2_PUBLIC Writer

private:
std::string uri_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_;
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_;
std::unique_ptr<Converter> converter_;
};

} // namespace rosbag2
Expand Down
19 changes: 12 additions & 7 deletions rosbag2/src/rosbag2/converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ namespace rosbag2
Converter::Converter(
const std::string & input_format,
const std::string & output_format,
const std::vector<TopicMetadata> & topics_and_types,
std::shared_ptr<rosbag2::SerializationFormatConverterFactoryInterface> converter_factory)
: converter_factory_(converter_factory)
{
for (const auto & topic_with_type : topics_and_types) {
topics_and_types_.insert({topic_with_type.name, topic_with_type.type});
}
input_converter_ = converter_factory_->load_converter(input_format);
output_converter_ = converter_factory_->load_converter(output_format);
if (!input_converter_) {
Expand All @@ -58,9 +54,8 @@ Converter::~Converter()
std::shared_ptr<SerializedBagMessage> Converter::convert(
std::shared_ptr<const rosbag2::SerializedBagMessage> message)
{
auto ts = get_typesupport(topics_and_types_[message->topic_name], "rosidl_typesupport_cpp");
auto introspection_ts =
get_typesupport(topics_and_types_[message->topic_name], "rosidl_typesupport_introspection_cpp");
auto ts = topics_and_types_.at(message->topic_name).cpp_type_support;
auto introspection_ts = topics_and_types_.at(message->topic_name).introspection_type_support;
auto allocator = rcutils_get_default_allocator();
std::shared_ptr<rosbag2_ros2_message_t> allocated_ros_message =
allocate_ros2_message(introspection_ts, &allocator);
Expand All @@ -72,4 +67,14 @@ std::shared_ptr<SerializedBagMessage> Converter::convert(
return output_message;
}

void Converter::add_topic(const std::string & topic, const std::string & type)
{
ConverterTypeSupport type_support;
type_support.cpp_type_support = get_typesupport(type, "rosidl_typesupport_cpp");
type_support.introspection_type_support =
get_typesupport(type, "rosidl_typesupport_introspection_cpp");

topics_and_types_.insert({topic, type_support});
}

} // namespace rosbag2
22 changes: 20 additions & 2 deletions rosbag2/src/rosbag2/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <utility>
#include <vector>

#include "rosbag2/info.hpp"

namespace rosbag2
{

Expand All @@ -41,13 +43,29 @@ SequentialReader::open(const StorageOptions & options, const std::string & rmw_s
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
auto storage_serialization_format = storage_->get_metadata().serialization_format;
auto topics = storage_->get_metadata().topics_with_message_count;
if (topics.empty()) {
return;
}

// Currently a bag file can only be played if all topics have the same serialization format.
auto storage_serialization_format = topics[0].topic_with_type.serialization_format;
for (const auto & topic : topics) {
if (topic.topic_with_type.serialization_format != storage_serialization_format) {
throw std::runtime_error("Topics with different rwm serialization format have been found. "
"All topics must have the same serialization format.");
}
}

if (rmw_serialization_format != storage_serialization_format) {
converter_ = std::make_unique<Converter>(
storage_serialization_format,
rmw_serialization_format,
storage_->get_all_topics_and_types(),
converter_factory_);
auto topics = storage_->get_all_topics_and_types();
for (const auto & topic_with_type : topics) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion rosbag2/src/rosbag2/types/ros2_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

#include "rosbag2/types/ros2_message.hpp"

#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rosidl_typesupport_introspection_cpp/field_types.hpp"
#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp"
Expand All @@ -34,6 +34,7 @@ allocate_ros2_message(
introspection_ts->data);
auto raw_ros2_message = new rosbag2_ros2_message_t();
raw_ros2_message->allocator = *allocator;
raw_ros2_message->topic_name = nullptr;
raw_ros2_message->message = raw_ros2_message->allocator.zero_allocate(
1, intro_ts_members->size_of_, raw_ros2_message->allocator.state);
// TODO(Martin-Idel-SI): Use custom allocator to make sure all memory is obtained that way
Expand All @@ -45,12 +46,22 @@ allocate_ros2_message(
return std::shared_ptr<rosbag2_ros2_message_t>(raw_ros2_message, deleter);
}

void ros2_message_set_topic_name(rosbag2_ros2_message_t * msg, const char * topic_name)
{
if (msg->topic_name) {
msg->allocator.deallocate(msg->topic_name, msg->allocator.state);
msg->topic_name = nullptr;
}
msg->topic_name = rcutils_strdup(topic_name, msg->allocator);
}

void deallocate_ros2_message(
rosbag2_ros2_message_t * msg,
const rosidl_typesupport_introspection_cpp::MessageMembers * members)
{
deallocate_internal_types(msg->message, members);
msg->allocator.deallocate(msg->message, msg->allocator.state);
msg->allocator.deallocate(msg->topic_name, msg->allocator.state);
delete msg;
}

Expand Down
37 changes: 28 additions & 9 deletions rosbag2/src/rosbag2/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,45 @@
#include <string>
#include <utility>

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2/info.hpp"
#include "rosbag2/storage_options.hpp"

namespace rosbag2
{

Writer::Writer(std::unique_ptr<rosbag2_storage::StorageFactoryInterface> factory)
: factory_(std::move(factory))
Writer::Writer(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_(std::move(storage_factory)),
converter_factory_(std::move(converter_factory)),
metadata_io_(std::move(metadata_io)),
converter_(nullptr)
{}

Writer::~Writer()
{
if (!uri_.empty()) {
rosbag2_storage::MetadataIo metadata_io;
metadata_io.write_metadata(uri_, storage_->get_metadata());
metadata_io_->write_metadata(uri_, storage_->get_metadata());
}

storage_.reset(); // Necessary to ensure that the writer is destroyed before the factory
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
}

void Writer::open(const StorageOptions & options)
void Writer::open(
const StorageOptions & options,
const std::string & input_serialization_format,
const std::string & output_serialization_format)
{
storage_ = factory_->open_read_write(options.uri, options.storage_id);
if (output_serialization_format != input_serialization_format) {
converter_ = std::make_unique<Converter>(
input_serialization_format,
output_serialization_format,
converter_factory_);
}

storage_ = storage_factory_->open_read_write(options.uri, options.storage_id);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
Expand All @@ -54,6 +69,10 @@ void Writer::create_topic(const TopicMetadata & topic_with_type)
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

if (converter_) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}

storage_->create_topic(topic_with_type);
}

Expand All @@ -63,7 +82,7 @@ void Writer::write(std::shared_ptr<SerializedBagMessage> message)
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

storage_->write(message);
storage_->write(converter_ ? converter_->convert(message) : message);
}

} // namespace rosbag2
Loading