Skip to content

Commit

Permalink
Enhanced dbsim to store view and position in storage engine
Browse files Browse the repository at this point in the history
Mimic real DBMS implementation by storing view and position
into storage engine.
  • Loading branch information
temeo committed Jan 25, 2019
1 parent 632f8c3 commit f30d9c0
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dbsim/db_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void db::client::run_one_transaction()
err = err || client_state_.after_prepare();
}
err = err || client_state_.before_commit();
if (err == 0) se_trx_.commit();
if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
err = err || client_state_.ordered_commit();
err = err || client_state_.after_commit();
if (err)
Expand Down
2 changes: 1 addition & 1 deletion dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
{
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
int ret(client_.client_state_.before_commit());
if (ret == 0) client_.se_trx_.commit();
if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());
ret = ret || client_.client_state_.ordered_commit();
ret = ret || client_.client_state_.after_commit();
return ret;
Expand Down
22 changes: 15 additions & 7 deletions dbsim/db_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,23 @@ db::params db::parse_args(int argc, char** argv)
"debug logging level: 0 - none, 1 - verbose")
("fast-exit", po::value<int>(&params.fast_exit),
"exit from simulation without graceful shutdown");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help"))
try
{
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help"))
{
std::cerr << desc << "\n";
exit(0);
}

validate_params(params);
}
catch (...)
{
std::cerr << desc << "\n";
exit(0);
exit(1);
}

validate_params(params);
return params;
}
20 changes: 10 additions & 10 deletions dbsim/db_server_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

db::server_service::server_service(db::server& server)
: server_(server)
, logged_view_()
{ }

wsrep::storage_service* db::server_service::storage_service(
Expand Down Expand Up @@ -110,7 +109,7 @@ void db::server_service::log_view(wsrep::high_priority_service*,
const wsrep::view& v)
{
wsrep::log_info() << "View:\n" << v;
logged_view_ = v;
server_.storage_engine().store_view(v);
}

void db::server_service::recover_streaming_appliers(
Expand All @@ -126,22 +125,23 @@ void db::server_service::recover_streaming_appliers(
wsrep::view db::server_service::get_view(wsrep::client_service&,
const wsrep::id& own_id)
{
int const my_idx(logged_view_.member_index(own_id));
wsrep::view stored_view(server_.storage_engine().get_view());
int const my_idx(stored_view.member_index(own_id));
wsrep::view my_view(
logged_view_.state_id(),
logged_view_.view_seqno(),
logged_view_.status(),
logged_view_.capabilities(),
stored_view.state_id(),
stored_view.view_seqno(),
stored_view.status(),
stored_view.capabilities(),
my_idx,
logged_view_.protocol_version(),
logged_view_.members()
stored_view.protocol_version(),
stored_view.members()
);
return my_view;
}

wsrep::gtid db::server_service::get_position(wsrep::client_service&)
{
throw wsrep::not_implemented_error();
return server_.storage_engine().get_position();
}

void db::server_service::log_state_change(
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_server_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ namespace db
void debug_sync(const char*) override;
private:
db::server& server_;
wsrep::view logged_view_;
};
}

Expand Down
8 changes: 8 additions & 0 deletions dbsim/db_simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ void db::simulator::sst(db::server& server,
wsrep::log_info() << "SST "
<< server.server_state().name()
<< " -> " << request;
i->second->storage_engine().store_position(gtid);
i->second->storage_engine().store_view(
server.storage_engine().get_view());
}

db::client dummy(*(i->second), wsrep::client_id(-1),
Expand Down Expand Up @@ -146,11 +149,16 @@ void db::simulator::start()
{
throw wsrep::runtime_error("Failed to connect");
}
wsrep::log_debug() << "main: Starting applier";
server.start_applier();
wsrep::log_debug() << "main: Waiting initializing state";
server.server_state().wait_until_state(wsrep::server_state::s_initializing);
wsrep::log_debug() << "main: Calling initialized";
server.server_state().initialized();
wsrep::log_debug() << "main: Waiting for synced state";
server.server_state().wait_until_state(
wsrep::server_state::s_synced);
wsrep::log_debug() << "main: Server synced";
}

// Start client threads
Expand Down
38 changes: 37 additions & 1 deletion dbsim/db_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ void db::storage_engine::transaction::apply(
se_.bf_abort_some(transaction);
}

void db::storage_engine::transaction::commit()
void db::storage_engine::transaction::commit(const wsrep::gtid& gtid)
{
if (cc_)
{
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
se_.transactions_.erase(cc_);
se_.store_position(gtid);
}
cc_ = nullptr;
}
Expand Down Expand Up @@ -80,3 +81,38 @@ void db::storage_engine::bf_abort_some(const wsrep::transaction& txc)
}
}
}

void db::storage_engine::store_position(const wsrep::gtid& gtid)
{
validate_position(gtid);
position_ = gtid;
}

wsrep::gtid db::storage_engine::get_position() const
{
return position_;
}

void db::storage_engine::store_view(const wsrep::view& view)
{
view_ = view;
}

wsrep::view db::storage_engine::get_view() const
{
return view_;
}

void db::storage_engine::validate_position(const wsrep::gtid& gtid) const
{
using std::rel_ops::operator<=;
if (position_.id() == gtid.id() && gtid.seqno() <= position_.seqno())
{
std::ostringstream os;
os << "Invalid position submitted, position seqno "
<< position_.seqno()
<< " is greater than submitted seqno "
<< gtid.seqno();
throw wsrep::runtime_error(os.str());
}
}
11 changes: 10 additions & 1 deletion dbsim/db_storage_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace db
, transactions_()
, alg_freq_(params.alg_freq)
, bf_aborts_()
, position_()
, view_()
{ }

class transaction
Expand All @@ -55,7 +57,7 @@ namespace db
bool active() const { return cc_ != nullptr; }
void start(client* cc);
void apply(const wsrep::transaction&);
void commit();
void commit(const wsrep::gtid&);
void rollback();
db::client* client() { return cc_; }
transaction(const transaction&) = delete;
Expand All @@ -66,11 +68,18 @@ namespace db
};
void bf_abort_some(const wsrep::transaction& tc);
long long bf_aborts() const { return bf_aborts_; }
void store_position(const wsrep::gtid& gtid);
wsrep::gtid get_position() const;
void store_view(const wsrep::view& view);
wsrep::view get_view() const;
private:
void validate_position(const wsrep::gtid& gtid) const;
wsrep::default_mutex mutex_;
std::unordered_set<db::client*> transactions_;
size_t alg_freq_;
std::atomic<long long> bf_aborts_;
wsrep::gtid position_;
wsrep::view view_;
};
}

Expand Down

0 comments on commit f30d9c0

Please sign in to comment.