From 49deb7da98930ee4f92a0854fbe0e1314e9387a0 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 19 Feb 2019 22:26:45 +0200 Subject: [PATCH] Refactored checks for transaction state before certification Moved the check for transaction state before certification step into separate method abort_or_interrupted() which will check the state and adjust state and client_state error status accordingly. Moved the check for abort_or_interrupted() to happen before the state is changed to certifying and write set data is appended. This makes the check atomic and reduces the probability of race conditions. After this check we rely on provider side transaction state management and error reporting until the certification step is over. Change to public API: Pass client_state mutex wrappend in unique_lock object to client_service::interrupted() call. This way the DBMS side has a control to the lock object in case it needs to unlock it temporarily. The underlying mutex will always be locked when the lock object is passed via interrupted() call. Other: Allow server_state change from donor to connected. This may happen if the joiner crashes during SST and the provider reports it before the DBMS side SST mechanism detects the error. --- dbsim/db_client_service.hpp | 4 +- include/wsrep/client_service.hpp | 7 ++- include/wsrep/transaction.hpp | 6 +++ src/server_state.cpp | 2 +- src/transaction.cpp | 75 +++++++++++++++++--------------- test/mock_client_state.hpp | 3 +- 6 files changed, 56 insertions(+), 41 deletions(-) diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index b2c751ac..1239f92f 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -33,7 +33,9 @@ namespace db public: client_service(db::client& client); - bool interrupted() const override { return false; } + bool interrupted(wsrep::unique_lock&) + const override + { return false; } void reset_globals() override { } void store_globals() override { } int prepare_data_for_replication() override diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 3a455d4e..5ca04fea 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -43,9 +43,12 @@ namespace wsrep /** * Return true if the current transaction has been interrupted - * by the DBMS. + * by the DBMS. The lock which is passed to interrupted call + * will always have underlying mutex locked. + * + * @param lock Lock object grabbed by the client_state */ - virtual bool interrupted() const = 0; + virtual bool interrupted(wsrep::unique_lock& lock) const = 0; /** * Reset possible global or thread local parameters associated diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index eb31a0d1..5284180b 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -197,6 +197,12 @@ namespace wsrep wsrep::provider& provider(); void flags(int flags) { flags_ = flags; } + // Return true if the transaction must abort, is aborting, + // or has been aborted, or has been interrupted by DBMS + // as indicated by client_service::interrupted() call. + // The call will adjust transaction state and set client_state + // error status accordingly. + bool abort_or_interrupt(wsrep::unique_lock&); int streaming_step(wsrep::unique_lock&); int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); diff --git a/src/server_state.cpp b/src/server_state.cpp index 4d92ccf9..b7a13c1b 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -1153,7 +1153,7 @@ void wsrep::server_state::state( { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */ { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */ { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */ - { 1, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ + { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */ { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */ { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ }; diff --git a/src/transaction.cpp b/src/transaction.cpp index fb0d9205..2152b944 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -983,24 +983,49 @@ void wsrep::transaction::state( { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ { 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ }; - if (allowed[state_][next_state]) - { - state_hist_.push_back(state_); - if (state_hist_.size() == 12) - { - state_hist_.erase(state_hist_.begin()); - } - state_ = next_state; - } - else + + if (!allowed[state_][next_state]) { std::ostringstream os; os << "unallowed state transition for transaction " << id_ << ": " << wsrep::to_string(state_) << " -> " << wsrep::to_string(next_state); - wsrep::log_error() << os.str(); - throw wsrep::runtime_error(os.str()); + wsrep::log_warning() << os.str(); + assert(0); + } + + state_hist_.push_back(state_); + if (state_hist_.size() == 12) + { + state_hist_.erase(state_hist_.begin()); + } + state_ = next_state; +} + +bool wsrep::transaction::abort_or_interrupt( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + return true; + } + else if (state() == s_aborting || state() == s_aborted) + { + assert(client_state_.current_error()); + return true; + } + else if (client_service_.interrupted(lock)) + { + client_state_.override_error(wsrep::e_interrupted_error); + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + return true; } + return false; } int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) @@ -1052,24 +1077,14 @@ int wsrep::transaction::certify_fragment( state() == s_must_abort); client_service_.wait_for_replayers(lock); - if (state() == s_must_abort) + if (abort_or_interrupt(lock)) { - client_state_.override_error(wsrep::e_deadlock_error); return 1; } state(lock, s_certifying); - lock.unlock(); - if (client_service_.interrupted()) - { - lock.lock(); - state(lock, s_must_abort); - client_state_.override_error(wsrep::e_interrupted_error); - return 1; - } - wsrep::mutable_buffer data; if (client_service_.prepare_fragment_for_replication(data)) { @@ -1272,9 +1287,8 @@ int wsrep::transaction::certify_commit( assert(lock.owns_lock()); - if (state() == s_must_abort) + if (abort_or_interrupt(lock)) { - client_state_.override_error(wsrep::e_deadlock_error); return 1; } @@ -1310,17 +1324,6 @@ int wsrep::transaction::certify_commit( return 1; } - if (client_service_.interrupted()) - { - lock.lock(); - client_state_.override_error(wsrep::e_interrupted_error); - if (state_ != s_must_abort) - { - state(lock, s_must_abort); - } - return 1; - } - client_service_.debug_sync("wsrep_before_certification"); enum wsrep::provider::status cert_ret(provider().certify(client_state_.id(), diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index af25fd57..899ca683 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -78,7 +78,8 @@ namespace wsrep int bf_rollback() WSREP_OVERRIDE; - bool interrupted() const WSREP_OVERRIDE + bool interrupted(wsrep::unique_lock&) + const WSREP_OVERRIDE { return killed_before_certify_; }