Skip to content

Commit

Permalink
Refactored checks for transaction state before certification
Browse files Browse the repository at this point in the history
Moved the check for transaction state before certification step
into separate method abort_or_interrupted() which will check the state
and adjust state and client_state error status accordingly.

Moved the check for abort_or_interrupted() to happen before
the state is changed to certifying and write set data is appended.
This makes the check atomic and reduces the probability of race
conditions. After this check we rely on provider side transaction
state management and error reporting until the certification step
is over.

Change to public API: Pass client_state mutex wrappend in unique_lock
object to client_service::interrupted() call. This way the DBMS side
has a control to the lock object in case it needs to unlock it
temporarily. The underlying mutex will always be locked when the lock
object is passed via interrupted() call.

Other: Allow server_state change from donor to connected. This may
happen if the joiner crashes during SST and the provider reports
it before the DBMS side SST mechanism detects the error.
  • Loading branch information
temeo committed Feb 19, 2019
1 parent ab0e5f5 commit 49deb7d
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 41 deletions.
4 changes: 3 additions & 1 deletion dbsim/db_client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ namespace db
public:
client_service(db::client& client);

bool interrupted() const override { return false; }
bool interrupted(wsrep::unique_lock<wsrep::mutex>&)
const override
{ return false; }
void reset_globals() override { }
void store_globals() override { }
int prepare_data_for_replication() override
Expand Down
7 changes: 5 additions & 2 deletions include/wsrep/client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ namespace wsrep

/**
* Return true if the current transaction has been interrupted
* by the DBMS.
* by the DBMS. The lock which is passed to interrupted call
* will always have underlying mutex locked.
*
* @param lock Lock object grabbed by the client_state
*/
virtual bool interrupted() const = 0;
virtual bool interrupted(wsrep::unique_lock<wsrep::mutex>& lock) const = 0;

/**
* Reset possible global or thread local parameters associated
Expand Down
6 changes: 6 additions & 0 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ namespace wsrep

wsrep::provider& provider();
void flags(int flags) { flags_ = flags; }
// Return true if the transaction must abort, is aborting,
// or has been aborted, or has been interrupted by DBMS
// as indicated by client_service::interrupted() call.
// The call will adjust transaction state and set client_state
// error status accordingly.
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
Expand Down
2 changes: 1 addition & 1 deletion src/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ void wsrep::server_state::state(
{ 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */
{ 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */
{ 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */
{ 1, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */
{ 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */
{ 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */
{ 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
};
Expand Down
75 changes: 39 additions & 36 deletions src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -983,24 +983,49 @@ void wsrep::transaction::state(
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */
};
if (allowed[state_][next_state])
{
state_hist_.push_back(state_);
if (state_hist_.size() == 12)
{
state_hist_.erase(state_hist_.begin());
}
state_ = next_state;
}
else

if (!allowed[state_][next_state])
{
std::ostringstream os;
os << "unallowed state transition for transaction "
<< id_ << ": " << wsrep::to_string(state_)
<< " -> " << wsrep::to_string(next_state);
wsrep::log_error() << os.str();
throw wsrep::runtime_error(os.str());
wsrep::log_warning() << os.str();
assert(0);
}

state_hist_.push_back(state_);
if (state_hist_.size() == 12)
{
state_hist_.erase(state_hist_.begin());
}
state_ = next_state;
}

bool wsrep::transaction::abort_or_interrupt(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
if (state() == s_must_abort)
{
client_state_.override_error(wsrep::e_deadlock_error);
return true;
}
else if (state() == s_aborting || state() == s_aborted)
{
assert(client_state_.current_error());
return true;
}
else if (client_service_.interrupted(lock))
{
client_state_.override_error(wsrep::e_interrupted_error);
if (state() != s_must_abort)
{
state(lock, s_must_abort);
}
return true;
}
return false;
}

int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
Expand Down Expand Up @@ -1052,24 +1077,14 @@ int wsrep::transaction::certify_fragment(
state() == s_must_abort);

client_service_.wait_for_replayers(lock);
if (state() == s_must_abort)
if (abort_or_interrupt(lock))
{
client_state_.override_error(wsrep::e_deadlock_error);
return 1;
}

state(lock, s_certifying);

lock.unlock();

if (client_service_.interrupted())
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_interrupted_error);
return 1;
}

wsrep::mutable_buffer data;
if (client_service_.prepare_fragment_for_replication(data))
{
Expand Down Expand Up @@ -1272,9 +1287,8 @@ int wsrep::transaction::certify_commit(

assert(lock.owns_lock());

if (state() == s_must_abort)
if (abort_or_interrupt(lock))
{
client_state_.override_error(wsrep::e_deadlock_error);
return 1;
}

Expand Down Expand Up @@ -1310,17 +1324,6 @@ int wsrep::transaction::certify_commit(
return 1;
}

if (client_service_.interrupted())
{
lock.lock();
client_state_.override_error(wsrep::e_interrupted_error);
if (state_ != s_must_abort)
{
state(lock, s_must_abort);
}
return 1;
}

client_service_.debug_sync("wsrep_before_certification");
enum wsrep::provider::status
cert_ret(provider().certify(client_state_.id(),
Expand Down
3 changes: 2 additions & 1 deletion test/mock_client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ namespace wsrep

int bf_rollback() WSREP_OVERRIDE;

bool interrupted() const WSREP_OVERRIDE
bool interrupted(wsrep::unique_lock<wsrep::mutex>&)
const WSREP_OVERRIDE
{ return killed_before_certify_; }


Expand Down

0 comments on commit 49deb7d

Please sign in to comment.