diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 02eefd34..21f368de 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -28,6 +28,8 @@ namespace db void after_apply() override; void store_globals() override { } void reset_globals() override { } + void switch_execution_context(wsrep::high_priority_service&) override + { } int log_dummy_write_set(const wsrep::ws_handle&, const wsrep::ws_meta&) { return 0; } diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index bf53d653..abd8d5f0 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -25,7 +25,14 @@ void db::server_service::release_storage_service( delete storage_service; } -wsrep::high_priority_service* db::server_service::streaming_applier_service() +wsrep::high_priority_service* db::server_service::streaming_applier_service( + wsrep::client_service&) +{ + return server_.streaming_applier_service(); +} + +wsrep::high_priority_service* db::server_service::streaming_applier_service( + wsrep::high_priority_service&) { return server_.streaming_applier_service(); } diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index 77590cce..b080035e 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -17,7 +17,8 @@ namespace db server_service(db::server& server); wsrep::storage_service* storage_service(wsrep::client_service&) override; void release_storage_service(wsrep::storage_service*) override; - wsrep::high_priority_service* streaming_applier_service() override; + wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override; + wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override; void release_high_priority_service(wsrep::high_priority_service*) override; bool sst_before_init() const override; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index e7025445..7a05a1dd 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -80,9 +80,24 @@ namespace wsrep */ virtual void after_apply() = 0; + /** + * Store global execution context for high priority service. + */ virtual void store_globals() = 0; + + /** + * Reset global execution context for high priority service. + */ virtual void reset_globals() = 0; + /** + * Switch exection context to context of orig_hps. + * + * @param orig_hps Original high priority service. + */ + virtual void switch_execution_context( + wsrep::high_priority_service& orig_hps) = 0; + virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0; virtual bool is_replaying() const = 0; @@ -102,6 +117,7 @@ namespace wsrep , current_service_(current_service) { orig_service_.reset_globals(); + current_service_.switch_execution_context(orig_service_); current_service_.store_globals(); } ~high_priority_switch() diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 81a13f1e..a3b9932b 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -34,13 +34,30 @@ namespace wsrep wsrep::client_service&) = 0; virtual void release_storage_service(wsrep::storage_service*) = 0; + + /** + * Create an applier state for streaming transaction applying. + * + * @param orig_cs Reference to client service which is + * requesting a new streaming applier service + * instance. + * + * @return Pointer to streaming applier client state. + */ + virtual wsrep::high_priority_service* + streaming_applier_service(wsrep::client_service& orig_cs) = 0; + /** * Create an applier state for streaming transaction applying. * + * @param orig_hps Reference to high priority service which is + * requesting a new streaming applier service + * instance. + * * @return Pointer to streaming applier client state. */ virtual wsrep::high_priority_service* - streaming_applier_service() = 0; + streaming_applier_service(wsrep::high_priority_service& orig_hps) = 0; /** * Release a client state allocated by either local_client_state() diff --git a/include/wsrep/transaction_id.hpp b/include/wsrep/transaction_id.hpp index 066f2004..f373283d 100644 --- a/include/wsrep/transaction_id.hpp +++ b/include/wsrep/transaction_id.hpp @@ -25,6 +25,7 @@ namespace wsrep { } type get() const { return id_; } static unsigned long long undefined() { return type(-1); } + bool is_undefined() const { return (id_ == type(-1)); } bool operator<(const transaction_id& other) const { return (id_ < other.id_); diff --git a/src/server_state.cpp b/src/server_state.cpp index d5c5d556..b0d75a37 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -61,7 +61,8 @@ namespace assert(server_state.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()) == 0); wsrep::high_priority_service* sa( - server_state.server_service().streaming_applier_service()); + server_state.server_service().streaming_applier_service( + high_priority_service)); server_state.start_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id(), sa); sa->start_transaction(ws_handle, ws_meta); @@ -84,14 +85,13 @@ namespace // commit fragment comes in. Although this is a valid // situation, log a warning if a sac cannot be found as // it may be an indication of a bug too. - assert(0); wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() << ": " << ws_meta.transaction_id(); } else { - wsrep::high_priority_switch(high_priority_service, *sa); + wsrep::high_priority_switch sw(high_priority_service, *sa); ret = sa->apply_write_set(data); sa->after_apply(); } @@ -117,7 +117,6 @@ namespace // commit fragment comes in. Although this is a valid // situation, log a warning if a sac cannot be found as // it may be an indication of a bug too. - assert(0); wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() @@ -125,9 +124,14 @@ namespace } else { - wsrep::high_priority_switch(high_priority_service, *sa); - ret = sa->commit(); - sa->after_apply(); + // Make high priority switch to go out of scope + // before the streaming applier is released. + { + wsrep::high_priority_switch sw( + high_priority_service, *sa); + ret = sa->commit(); + sa->after_apply(); + } server_state.stop_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()); server_state.server_service().release_high_priority_service(sa); diff --git a/src/transaction.cpp b/src/transaction.cpp index 0c7f337d..5886052c 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -1080,7 +1080,8 @@ void wsrep::transaction::streaming_rollback() { assert(streaming_context_.rolled_back() == false); wsrep::high_priority_service* sa( - server_service_.streaming_applier_service()); + server_service_.streaming_applier_service( + client_state_.client_service())); client_state_.server_state().start_streaming_applier( client_state_.server_state().id(), id(), sa); sa->adopt_transaction(*this); diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 2d3b760c..525e7b01 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -43,6 +43,8 @@ namespace wsrep void after_apply() WSREP_OVERRIDE; void store_globals() WSREP_OVERRIDE { } void reset_globals() WSREP_OVERRIDE { } + void switch_execution_context(wsrep::high_priority_service&) + WSREP_OVERRIDE { } int log_dummy_write_set(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 536c281d..77f1f3cd 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -63,7 +63,21 @@ namespace wsrep delete client_state; } - wsrep::high_priority_service* streaming_applier_service() + wsrep::high_priority_service* streaming_applier_service( + wsrep::client_service&) + { + wsrep::mock_client* cs(new wsrep::mock_client( + *this, ++last_client_id_, + wsrep::client_state::m_high_priority)); + wsrep::mock_high_priority_service* ret( + new wsrep::mock_high_priority_service(*this, cs, false)); + cs->open(cs->id()); + cs->before_command(); + return ret; + } + + wsrep::high_priority_service* streaming_applier_service( + wsrep::high_priority_service&) { wsrep::mock_client* cs(new wsrep::mock_client( *this, ++last_client_id_,