Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use serialized message directly #24

Merged
merged 21 commits into from
Sep 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
22e771d
Adapt new interface
Martin-Idel-SI Aug 10, 2018
ab73851
Try to write and read rcutils_char_array_t BLOBs in sqlite
botteroa-si Aug 10, 2018
b155060
Add simple test for arbitrary char ptr
Martin-Idel-SI Aug 14, 2018
79c1a3d
Refactor SqliteWrapper and add tests
botteroa-si Aug 14, 2018
3afe8bd
Write and read actual timestamp from serialized message and add relat…
botteroa-si Aug 14, 2018
b7831bd
Add SqliteStatementWrapper class and refactor SqliteStorage and Sqlit…
botteroa-si Aug 14, 2018
18f8c70
Refactor test fixture
botteroa-si Aug 16, 2018
94bf386
GH-50 Assert message content in write_integration_test, and remove TODOs
botteroa-si Aug 16, 2018
226740e
GH-50 Remove sqlite_storage_plugin unit tests
botteroa-si Aug 16, 2018
e2237ac
GH-50 Refactor SqliteStatements and SqliteStorage
botteroa-si Aug 17, 2018
114bdd9
GH-50 Make has_next() method no more const
botteroa-si Aug 21, 2018
0a23b29
GH-50 Fix build after rebase
botteroa-si Aug 21, 2018
e4ca9e9
GH-50 Refactor after rebase
botteroa-si Aug 22, 2018
f618390
GH-52 Extend statement wrapper with a generic bind
anhosi Aug 22, 2018
6074626
GH-59 cleanup db interface
anhosi Aug 22, 2018
96281bf
GH-59 Introduce general read interface for sqlite statements
anhosi Aug 23, 2018
d3061aa
GH-59 Cleanup: remove unused files
anhosi Aug 23, 2018
3ae2da8
GH-59 make sqlite interface fluent
anhosi Aug 24, 2018
179a1d9
GH-59 move creation of serialized message to rosbag2_storage
anhosi Aug 24, 2018
725bf95
Change rcutil_char_array_t to rmw_serialized_message_t in subscriber
Martin-Idel-SI Aug 28, 2018
45f1508
Remove debugging output in test
Martin-Idel-SI Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
GH-59 Introduce general read interface for sqlite statements
- Uses a std::tuple for row data
- Exposes an iterator interface for the query result
  • Loading branch information
anhosi authored and Martin-Idel-SI committed Aug 30, 2018
commit 96281bfd51880686f9a3dd100bfbe77cae3e98de
13 changes: 13 additions & 0 deletions rosbag2_storage_default_plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()

ament_add_gmock(sqlite_wrapper_integration_test
test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp
src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp
src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET sqlite_wrapper_integration_test)
ament_target_dependencies(sqlite_wrapper_integration_test
rosbag2_storage
rcutils
SQLite3
pluginlib)
endif()

ament_add_gmock(sqlite_storage_integration_test
test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp
src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ void SqliteStatementWrapper::execute_and_reset()
reset();
}

void SqliteStatementWrapper::advance_one_row()
{
int row = sqlite3_step(statement_);
if (row != SQLITE_ROW) {
throw SqliteException("No more rows available in table.");
}
}

void SqliteStatementWrapper::bind(int value)
{
auto return_code = sqlite3_bind_int(statement_, ++last_bound_parameter_index_, value);
Expand Down Expand Up @@ -98,23 +90,64 @@ void SqliteStatementWrapper::bind(std::shared_ptr<rcutils_char_array_t> value)
check_and_report_bind_error(return_code);
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStatementWrapper::read_table_entry(
int blob_column, int timestamp_column)
void SqliteStatementWrapper::reset()
{
sqlite3_reset(statement_);
sqlite3_clear_bindings(statement_);
last_bound_parameter_index_ = 0;
written_blobs_cache_.clear();
}

bool SqliteStatementWrapper::step()
{
int return_code = sqlite3_step(statement_);
if (return_code == SQLITE_ROW) {
return true;
} else if (return_code == SQLITE_DONE) {
return false;
} else {
throw SqliteException("Error reading query result: " + std::to_string(return_code));
}
}

void SqliteStatementWrapper::obtain_column_value(size_t index, int & value) const
{
value = sqlite3_column_int(statement_, static_cast<int>(index));
}

void
SqliteStatementWrapper::obtain_column_value(size_t index, rcutils_time_point_value_t & value) const
{
value = sqlite3_column_int64(statement_, static_cast<int>(index));
}

void SqliteStatementWrapper::obtain_column_value(size_t index, double & value) const
{
value = sqlite3_column_double(statement_, static_cast<int>(index));
}

void SqliteStatementWrapper::obtain_column_value(size_t index, std::string & value) const
{
value = reinterpret_cast<const char *>(sqlite3_column_text(statement_, static_cast<int>(index)));
}

void SqliteStatementWrapper::obtain_column_value(
size_t index, std::shared_ptr<rcutils_char_array_t> & value) const
{
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
int buffer_size = sqlite3_column_bytes(statement_, blob_column);
auto data = sqlite3_column_blob(statement_, static_cast<int>(index));
auto size = static_cast<size_t>(sqlite3_column_bytes(statement_, static_cast<int>(index)));

auto rcutils_allocator = new rcutils_allocator_t;
*rcutils_allocator = rcutils_get_default_allocator();
auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, buffer_size, rcutils_allocator);
auto ret = rcutils_char_array_init(msg, size, rcutils_allocator);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message" +
throw std::runtime_error("Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string_safe()));
}

message->serialized_data = std::shared_ptr<rcutils_char_array_t>(msg,
value = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
delete msg;
Expand All @@ -125,22 +158,8 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStatementWrapper::r
}
});

memcpy(
reinterpret_cast<unsigned char *>(message->serialized_data->buffer),
sqlite3_column_blob(statement_, blob_column),
buffer_size);
message->serialized_data->buffer_length = buffer_size;
message->time_stamp = sqlite3_column_int64(statement_, timestamp_column);

return message;
}

void SqliteStatementWrapper::reset()
{
sqlite3_reset(statement_);
sqlite3_clear_bindings(statement_);
last_bound_parameter_index_ = 0;
written_blobs_cache_.clear();
memcpy(value->buffer, data, size);
value->buffer_length = size;
}

void SqliteStatementWrapper::check_and_report_bind_error(int return_code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <stdexcept>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

Expand All @@ -37,8 +38,102 @@ class SqliteStatementWrapper : public std::enable_shared_from_this<SqliteStateme
SqliteStatementWrapper & operator=(const SqliteStatementWrapper &) = delete;
~SqliteStatementWrapper();

template<typename ... Columns>
class QueryResult
{
public:
using RowType = std::tuple<Columns ...>;
class Iterator : public std::iterator<std::input_iterator_tag, RowType>
{
public:
static const int POSITION_END = -1;
Iterator(std::shared_ptr<SqliteStatementWrapper> statement, int position)
: statement_(statement), next_row_idx_(position)
{
if (next_row_idx_ != POSITION_END) {
if (statement_->step()) {
++next_row_idx_;
} else {
next_row_idx_ = POSITION_END;
}
}
}

Iterator & operator++()
{
if (next_row_idx_ != POSITION_END) {
if (statement_->step()) {
++next_row_idx_;
} else {
next_row_idx_ = POSITION_END;
}
return *this;
} else {
throw SqliteException("Cannot increment result iterator beyond result set!");
}
}
Iterator operator++(int)
{
auto old_value = *this;
++(*this);
return old_value;
}

RowType operator*() const
{
RowType row{};
obtain_row_values(row);
return row;
}

bool operator==(Iterator other) const
{
return statement_ == other.statement_ && next_row_idx_ == other.next_row_idx_;
}
bool operator!=(Iterator other) const
{
return !(*this == other);
}

private:
template<typename Indices = std::index_sequence_for<Columns ...>>
void obtain_row_values(RowType & row) const
{
obtain_row_values_impl(row, Indices{});
}

template<size_t I, size_t ... Is, typename RemainingIndices = std::index_sequence<Is ...>>
void obtain_row_values_impl(RowType & row, std::index_sequence<I, Is ...>) const
{
statement_->obtain_column_value(I, std::get<I>(row));
obtain_row_values_impl(row, RemainingIndices{});
}
void obtain_row_values_impl(RowType &, std::index_sequence<>) const {} // end of recursion

std::shared_ptr<SqliteStatementWrapper> statement_;
int next_row_idx_;
};

explicit QueryResult(std::shared_ptr<SqliteStatementWrapper> statement)
: statement_(statement)
{}

Iterator begin()
{
return Iterator(statement_, 0);
}
Iterator end()
{
return Iterator(statement_, Iterator::POSITION_END);
}

private:
std::shared_ptr<SqliteStatementWrapper> statement_;
};

void execute_and_reset();
void advance_one_row();
template<typename ... Columns>
QueryResult<Columns ...> execute_query();

template<typename T1, typename T2, typename ... Params>
void bind(T1 value1, T2 value2, Params ... values);
Expand All @@ -48,12 +143,17 @@ class SqliteStatementWrapper : public std::enable_shared_from_this<SqliteStateme
void bind(std::string value);
void bind(std::shared_ptr<rcutils_char_array_t> value);

std::shared_ptr<rosbag2_storage::SerializedBagMessage>
read_table_entry(int blob_column, int timestamp_column);

void reset();

private:
bool step();

void obtain_column_value(size_t index, int & value) const;
void obtain_column_value(size_t index, rcutils_time_point_value_t & value) const;
void obtain_column_value(size_t index, double & value) const;
void obtain_column_value(size_t index, std::string & value) const;
void obtain_column_value(size_t index, std::shared_ptr<rcutils_char_array_t> & value) const;

template<typename T>
void check_and_report_bind_error(int return_code, T value);
void check_and_report_bind_error(int return_code);
Expand Down Expand Up @@ -89,6 +189,13 @@ void SqliteStatementWrapper::check_and_report_bind_error(int return_code, T valu
check_and_report_bind_error(return_code, std::to_string(value));
}

template<typename ... Columns>
inline
SqliteStatementWrapper::QueryResult<Columns ...> SqliteStatementWrapper::execute_query()
{
return QueryResult<Columns ...>(shared_from_this());
}

using SqliteStatement = std::shared_ptr<SqliteStatementWrapper>;

} // namespace rosbag2_storage_plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ namespace rosbag2_storage_plugins
const char * ROS_PACKAGE_NAME = "rosbag2_storage_default_plugins";

SqliteStorage::SqliteStorage()
: database_(),
bag_info_(),
write_statement_(nullptr),
read_statement_(nullptr),
ready_to_read_next_(false)
: database_(), bag_info_(), write_statement_(nullptr), read_statement_(nullptr),
message_result_(nullptr),
current_message_row_(nullptr, SqliteStatementWrapper::QueryResult<>::Iterator::POSITION_END)
{}

void SqliteStorage::open(
Expand Down Expand Up @@ -69,29 +67,19 @@ bool SqliteStorage::has_next()
prepare_for_reading();
}

try {
read_statement_->advance_one_row();
ready_to_read_next_ = true;
return true;
} catch (const SqliteException &) {
return false;
}
return current_message_row_ != message_result_.end();
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStorage::read_next()
{
if (!ready_to_read_next_) {
if (!read_statement_) {
prepare_for_reading();
}
read_statement_->advance_one_row();
if (!read_statement_) {
prepare_for_reading();
}

ready_to_read_next_ = false;

int blob_data_column = 1;
int timestamp_column = 2;
return read_statement_->read_table_entry(blob_data_column, timestamp_column);
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
std::tie(bag_message->serialized_data, bag_message->time_stamp) = *current_message_row_;
++current_message_row_;
return bag_message;
}

void SqliteStorage::initialize()
Expand Down Expand Up @@ -122,7 +110,11 @@ void SqliteStorage::prepare_for_writing()

void SqliteStorage::prepare_for_reading()
{
read_statement_ = database_->prepare_statement("SELECT * FROM messages ORDER BY id;");
read_statement_ =
database_->prepare_statement("SELECT data, timestamp FROM messages ORDER BY id;");
message_result_ = read_statement_->execute_query<
std::shared_ptr<rcutils_char_array_t>, rcutils_time_point_value_t>();
current_message_row_ = message_result_.begin();
}

} // namespace rosbag2_storage_plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class SqliteStorage : public rosbag2_storage::storage_interfaces::ReadWriteInter
rosbag2_storage::BagInfo bag_info_;
SqliteStatement write_statement_;
SqliteStatement read_statement_;
bool ready_to_read_next_;
SqliteStatementWrapper::QueryResult<std::shared_ptr<rcutils_char_array_t>,
rcutils_time_point_value_t> message_result_;
SqliteStatementWrapper::QueryResult<std::shared_ptr<rcutils_char_array_t>,
rcutils_time_point_value_t>::Iterator current_message_row_;
};

} // namespace rosbag2_storage_plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqli
auto read_messages = read_all_messages_from_sqlite();

ASSERT_THAT(read_messages, SizeIs(3));
EXPECT_THAT(deserialize_message(*read_messages[0]), Eq(string_messages[0].first));
EXPECT_THAT(deserialize_message(*read_messages[1]), Eq(string_messages[1].first));
EXPECT_THAT(deserialize_message(*read_messages[2]), Eq(string_messages[2].first));
EXPECT_THAT(deserialize_message(read_messages[0]->serialized_data), Eq(string_messages[0].first));
EXPECT_THAT(deserialize_message(read_messages[1]->serialized_data), Eq(string_messages[1].first));
EXPECT_THAT(deserialize_message(read_messages[2]->serialized_data), Eq(string_messages[2].first));
}

TEST_F(StorageTestFixture, message_roundtrip_with_arbitrary_char_array_works_correctly) {
Expand Down
Loading