Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding Rewriter #709

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/info.cpp
src/rosbag2_cpp/reader.cpp
src/rosbag2_cpp/readers/sequential_reader.cpp
src/rosbag2_cpp/reshapers/stitcher.cpp
src/rosbag2_cpp/rmw_implemented_serialization_format_converter.cpp
src/rosbag2_cpp/serialization_format_converter_factory.cpp
src/rosbag2_cpp/types/introspection_message.cpp
Expand Down Expand Up @@ -224,6 +225,13 @@ if(BUILD_TESTING)
target_link_libraries(test_multifile_reader ${PROJECT_NAME})
endif()

ament_add_gmock(test_reshaper_stitcher
test/rosbag2_cpp/test_reshaper_stitcher.cpp)
if(TARGET test_reshaper_stitcher)
ament_target_dependencies(test_reshaper_stitcher rosbag2_storage)
target_link_libraries(test_reshaper_stitcher ${PROJECT_NAME})
endif()

ament_add_gmock(test_time_controller_clock
test/rosbag2_cpp/test_time_controller_clock.cpp)
if(TARGET test_time_controller_clock)
Expand Down
156 changes: 156 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reshapers/stitcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2018, Bosch Software Innovations GmbH.
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2021 Firefly Automatix, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__RESHAPERS__STITCHER_HPP_
#define ROSBAG2_CPP__RESHAPERS__STITCHER_HPP_

#include <memory>
#include <string>
#include <vector>

#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
// b) there are no inline functions.
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif

namespace rosbag2_cpp
{
namespace reader_interfaces
{
class BaseReaderInterface;
} // namespace reader_interfaces
namespace writer_interfaces
{
class BaseWriterInterface;
} // namespace writer_interfaces

class ROSBAG2_CPP_PUBLIC Stitcher final
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring for all classes please. What exactly is the intended purpose of this class? That helps inform all following review

{
static constexpr char const * kDefaultStorageID = "sqlite3";

public:
explicit Stitcher(
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl =
std::make_unique<readers::SequentialReader>(),
std::unique_ptr<writer_interfaces::BaseWriterInterface> writer_impl =
std::make_unique<writers::SequentialWriter>());

~Stitcher();

/**
* Opens the list of URIs and prepares it for stitching.
* Each URI in this vector must be a bagfile that exists.
* This must be called before any other function is used.
*
* \note This will step through the directory with the default storage options
* * using sqlite3 storage backend
* * using no converter options, storing messages with the incoming serialization format
* \sa rmw_get_serialization_format.
* For specifications, please see \sa open, which let's you specify
* more storage and converter options.
*
* \param storage_uris A vector of URI of the storage to open.
* \param output_uri The uri of the storage to write to.
**/
void open(const std::vector<std::string> & storage_uris, const std::string & output_uri);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is "too low-level" - you should let existing classes do the things they already know how to do. E.g. if you are trying to stitch together "files within a single bag" - then you should point at the bag, not the files. This should probably be something like

Suggested change
void open(const std::vector<std::string> & storage_uris, const std::string & output_uri);
void open(rosbag2_storage::StorageOptions input_storage, rosbag2_storage::StorageOptions output_storage);

This allows you to simply pass input_storage to Reader::open() (and output_storage to Writer::open())

What this API also suggests is that this class is not just a "Stitcher" - it's a "BagReshaper" or a "BagConverter" - what it would actually do is

  1. Open an existing bag that was recorded with some configuration
  2. Write all messages from that bag to a new bag with some new configuration.

This should allow you to convert from one storage implementation to another one, split large bagfile(s) into more smaller ones, combine many small bagfiles into fewer larger ones. The "Stitch into one" assumption is too limited as an API!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok so do you imagine this API to really be doing the "reshaping" and then the "front-end" would be the system that decides what is to be shaped and how?

In other words, should I rename and redo this from stitcher to reshaper, with the details of how it is being reshaped (stitched, or split) to the consumer of the API?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I think that this would be more appropriate as a Reshaper, and "stitching a multibag into a monobag" is just one possible behavior from given inputs.

Given your implementation of "read messages one at a time and send to writer" - this more general implementation should look exactly the same under the hood, but provide much more flexible functionality.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To go one step further - I think this API is much too large - as far as I can tell given the feature description, the public API should be a single function, like follows:

class Rewriter
{
public:
Rewriter(reader, writer);
void rewrite(StorageOptions input_options, StorageOptions output_options);
};

Naming things is hard... but I'm almost thinking that, given the description of the functionality, that it's more of a "Rewriter", because it reads a bag, and writes it again with new settings to a new location. It doesn't have to change the shape, it may be changing the storage implementation, compression, or serialization format, without changing the shape.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah, I think I mimicked the API around the Reader api. But I totally agree it can be stripped down.

However, Is there not a need to pass in ConverterOptions as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mimicked the API around the Reader api

It's better to think about what functionality you're trying to provide. Your implementation will call the Reader/Writer APIs, but this thing isn't the same type of thing, so there is no reason for it to have the same API.

However, Is there not a need to pass in ConverterOptions as well?

Yes, probably

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussion here: #709 (comment)

I think there is no way to do a many to one bagfile without this API knowing about multiple bags to read.


/**
* Opens the list of URIs and prepares it for stitching.
* Each URI in this vector must be a bagfile that exists.
* This must be called before any other function is used.
*
* \note This will step through the directory with the default storage options
* * using sqlite3 storage backend
* * using no converter options, storing messages with the incoming serialization format
* \sa rmw_get_serialization_format.
* For specifications, please see \sa open, which let's you specify
* more storage and converter options.
*
* \param storage_uris A vector of URI of the storage to open.
* \param output_uri The uri of the storage to write to.
* \param writer_storage_options custom storage options. The storage for the writer must match the reader.
**/
void open(
const std::vector<std::string> & storage_uris,
vinnnyr marked this conversation as resolved.
Show resolved Hide resolved
const rosbag2_storage::StorageOptions & writer_storage_options,
const ConverterOptions & converter_options);

/**
* Ask whether the stitcher has at least one more URI to stitch.
*
* \return true if there remains at least one more unstitched bag
* \throws runtime_error if Stitcher is not open.
*/
bool has_next();

/**
* Process next message from list of storage. Will throw if no more storage URI
* to stitch.
*
* Expected usage:
* if (stitcher.has_next()) {stitcher.stitch_next();}
*/
void stitch_next();

/**
* Set filters to adhere to during stitching. This only works for the CURRENT reader.
*
* \param storage_filter Filter to apply to stitching
* \throws runtime_error if the Stitcher is not open.
*/
void set_filter(const rosbag2_storage::StorageFilter & storage_filter);

/**
* Reset all filters for stitching. This only works for the CURRENT reader.
*/
void reset_filter();

reader_interfaces::BaseReaderInterface & get_reader_implementation_handle() const
{
return *reader_impl_;
}

writer_interfaces::BaseWriterInterface & get_writer_implementation_handle() const
{
return *writer_impl_;
}

private:
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl_;
std::unique_ptr<writer_interfaces::BaseWriterInterface> writer_impl_;

std::string storage_id_;
ConverterOptions converter_options_;

std::vector<std::string> storage_uris_;
std::vector<std::string>::iterator current_iter_;

bool has_been_opened_ = false;
};

} // namespace rosbag2_cpp

#ifdef _WIN32
# pragma warning(pop)
#endif

#endif // ROSBAG2_CPP__RESHAPERS__STITCHER_HPP_
115 changes: 115 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reshapers/stitcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2018, Bosch Software Innovations GmbH.
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2021 Firefly Automatix, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rosbag2_cpp/reshapers/stitcher.hpp"

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"

namespace rosbag2_cpp
{

Stitcher::Stitcher(
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl,
std::unique_ptr<writer_interfaces::BaseWriterInterface> writer_impl)
: reader_impl_(std::move(reader_impl)), writer_impl_(std::move(writer_impl))
{}

Stitcher::~Stitcher()
{
reader_impl_->reset();
writer_impl_->reset();
}

void Stitcher::open(
const std::vector<std::string> & storage_uris,
const std::string & output_uri)
{
rosbag2_storage::StorageOptions writer_storage_options;
writer_storage_options.uri = output_uri;
writer_storage_options.storage_id = kDefaultStorageID;
rosbag2_cpp::ConverterOptions converter_options{};

return open(storage_uris, writer_storage_options, converter_options);
}

void Stitcher::open(
const std::vector<std::string> & storage_uris,
const rosbag2_storage::StorageOptions & writer_storage_options,
const ConverterOptions & converter_options)
{
// open writer object for output
writer_impl_->open(writer_storage_options, converter_options);

// store default converter, storage options
converter_options_ = converter_options;
storage_id_ = writer_storage_options.storage_id;

// store uris to read for later use
storage_uris_ = std::move(storage_uris);
current_iter_ = storage_uris_.begin();

has_been_opened_ = true;
}

bool Stitcher::has_next()
{
if (!has_been_opened_) {
throw std::runtime_error("stitcher has not been opened yet");
}
return std::distance(storage_uris_.end(), current_iter_) != 0;
}

void Stitcher::stitch_next()
{
if (!has_been_opened_) {
throw std::runtime_error("stitcher has not been opened yet");
}
// read bag from current iterator
rosbag2_storage::StorageOptions storage_options;
storage_options.uri = *current_iter_;
storage_options.storage_id = storage_id_;
reader_impl_->open(storage_options, converter_options_);

// write contents of bag to output writer
while (reader_impl_->has_next()) {
auto bag_message = reader_impl_->read_next();
writer_impl_->write(bag_message);
}

// advance iterator
current_iter_++;

// clear reader impl to get ready for next iteration
reader_impl_->reset();
}

void Stitcher::set_filter(const rosbag2_storage::StorageFilter & storage_filter)
{
reader_impl_->set_filter(storage_filter);
}

void Stitcher::reset_filter()
{
reader_impl_->reset_filter();
}

} // namespace rosbag2_cpp
Loading