From 769cd672b80f8855bb389ea5651c440d62e003b0 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 16 Sep 2019 18:03:01 -0400 Subject: [PATCH] Refactor producer plugin start_block into a few methods to make the logic easier to follow --- plugins/producer_plugin/producer_plugin.cpp | 486 ++++++++++---------- 1 file changed, 253 insertions(+), 233 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index a9cfab68cb..a8b4f87a6c 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -202,8 +202,13 @@ class producer_plugin_impl : public std::enable_shared_from_this(); - auto& persisted_by_expiry = _persistent_transactions.get(); - if (!persisted_by_expiry.empty()) { - int num_expired_persistent = 0; - int orig_count = _persistent_transactions.size(); - - while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pending_block_time) { - if (preprocess_deadline <= fc::time_point::now()) { - exhausted = true; - break; - } - auto const& txid = persisted_by_expiry.begin()->trx_id; - if (_pending_block_mode == pending_block_mode::producing) { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}", - ("block_num", chain.head_block_num() + 1) - ("prod", chain.pending_block_producer()) - ("txid", txid)); - } else { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}", - ("txid", txid)); - } + remove_expired_persisted_trxs( preprocess_deadline ); + remove_expired_blacklisted_trxs( preprocess_deadline ); + + try { + size_t orig_pending_txn_size = _pending_incoming_transactions.size(); + + // attempt to play persisted transactions first + process_unapplied_trxs( preprocess_deadline ); - persisted_by_expiry.erase(persisted_by_expiry.begin()); - num_expired_persistent++; + if (_pending_block_mode == pending_block_mode::producing) { + process_scheduled_and_incoming_trxs( preprocess_deadline, orig_pending_txn_size ); } - if( exhausted ) { - fc_wlog( _log, "Unable to process all ${n} persisted transactions before deadline, Expired ${expired}", - ( "n", orig_count ) - ( "expired", num_expired_persistent ) ); + if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() + return start_block_result::failed; + if( preprocess_deadline <= fc::time_point::now() ) { + return start_block_result::exhausted; } else { - fc_dlog( _log, "Processed ${n} persisted transactions, Expired ${expired}", - ( "n", orig_count ) - ( "expired", num_expired_persistent ) ); + _incoming_trx_weight = 0.0; + + // attempt to apply any pending incoming transactions + process_incoming_trxs( preprocess_deadline, orig_pending_txn_size ); + if (preprocess_deadline <= fc::time_point::now()) return start_block_result::exhausted; + return start_block_result::succeeded; } - } - try { - size_t orig_pending_txn_size = _pending_incoming_transactions.size(); + } catch ( const guard_exception& e ) { + chain_plugin::handle_guard_exception(e); + return start_block_result::failed; + } catch ( std::bad_alloc& ) { + chain_plugin::handle_bad_alloc(); + } catch ( boost::interprocess::bad_alloc& ) { + chain_plugin::handle_db_exhaustion(); + } + } - // Processing unapplied transactions... - // - if (_producers.empty() && persisted_by_id.empty()) { - // if this node can never produce and has no persisted transactions, - // there is no need for unapplied transactions they can be dropped - chain.get_unapplied_transactions().clear(); - } else { - // derive appliable transactions from unapplied_transactions and drop droppable transactions - unapplied_transactions_type& unapplied_trxs = chain.get_unapplied_transactions(); - if( !unapplied_trxs.empty() ) { - auto unapplied_trxs_size = unapplied_trxs.size(); - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; - auto calculate_transaction_category = [&](const transaction_metadata_ptr& trx) { - if (trx->packed_trx->expiration() < pending_block_time) { - return tx_category::EXPIRED; - } else if (persisted_by_id.find(trx->id) != persisted_by_id.end()) { - return tx_category::PERSISTED; - } else { - return tx_category::UNEXPIRED_UNPERSISTED; - } - }; - - auto itr = unapplied_trxs.begin(); - while( itr != unapplied_trxs.end() ) { - auto itr_next = itr; // save off next since itr may be invalidated by loop - ++itr_next; - - if( preprocess_deadline <= fc::time_point::now() ) exhausted = true; - if( exhausted ) break; - const transaction_metadata_ptr trx = itr->second; - auto category = calculate_transaction_category(trx); - if (category == tx_category::EXPIRED || - (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) - { - if (!_producers.empty()) { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}", - ("txid", trx->id)); - } - itr = unapplied_trxs.erase( itr ); // unapplied_trxs map has not been modified, so simply erase and continue - continue; - } else if (category == tx_category::PERSISTED || - (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) - { - ++num_processed; - - try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); - bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && preprocess_deadline < deadline)) { - deadline_is_subjective = true; - deadline = preprocess_deadline; - } - - auto trace = chain.push_transaction(trx, deadline); - if (trace->except) { - if (failure_is_subjective(*trace->except, deadline_is_subjective)) { - exhausted = true; - break; - } else { - // this failed our configured maximum transaction time, we don't want to replay it - // chain.plus_transactions can modify unapplied_trxs, so erase by id - unapplied_trxs.erase( trx->signed_id ); - ++num_failed; - } - } else { - ++num_applied; - } - } LOG_AND_DROP(); - } + return start_block_result::failed; +} - itr = itr_next; - } +void producer_plugin_impl::remove_expired_persisted_trxs( const fc::time_point& deadline ) +{ + auto& persisted_by_expiry = _persistent_transactions.get(); + if (!persisted_by_expiry.empty()) { + chain::controller& chain = chain_plug->chain(); + int num_expired_persistent = 0; + int orig_count = _persistent_transactions.size(); - fc_dlog(_log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}", - ("m", num_processed) - ("n", unapplied_trxs_size) - ("applied", num_applied) - ("failed", num_failed)); - } + bool exhausted = false; + const time_point pending_block_time = chain.pending_block_time(); + while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pending_block_time) { + if (deadline <= fc::time_point::now()) { + exhausted = true; + break; } - + auto const& txid = persisted_by_expiry.begin()->trx_id; if (_pending_block_mode == pending_block_mode::producing) { - auto& blacklist_by_id = _blacklisted_transactions.get(); - auto& blacklist_by_expiry = _blacklisted_transactions.get(); - auto now = fc::time_point::now(); - if(!blacklist_by_expiry.empty()) { - int num_expired = 0; - int orig_count = _blacklisted_transactions.size(); - - while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { - if (preprocess_deadline <= fc::time_point::now()) break; - blacklist_by_expiry.erase(blacklist_by_expiry.begin()); - num_expired++; - } - - fc_dlog(_log, "Processed ${n} blacklisted transactions, Expired ${expired}", - ("n", orig_count) - ("expired", num_expired)); - } + fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}", + ("block_num", chain.head_block_num() + 1) + ("prod", chain.pending_block_producer()) + ("txid", txid)); + } else { + fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}", + ("txid", txid)); + } - // scheduled transactions - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; - - auto scheduled_trx_deadline = preprocess_deadline; - if (_max_scheduled_transaction_time_per_block_ms >= 0) { - scheduled_trx_deadline = std::min( - scheduled_trx_deadline, - fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) - ); - } - time_point pending_block_time = chain.pending_block_time(); - const auto& sch_idx = chain.db().get_index(); - const auto scheduled_trxs_size = sch_idx.size(); - auto sch_itr = sch_idx.begin(); - while( sch_itr != sch_idx.end() ) { - if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet - if( sch_itr->published >= pending_block_time ) { - ++sch_itr; - continue; // do not allow schedule and execute in same block - } - if( scheduled_trx_deadline <= fc::time_point::now() ) { - exhausted = true; - break; - } + persisted_by_expiry.erase(persisted_by_expiry.begin()); + num_expired_persistent++; + } - const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated - if (blacklist_by_id.find(trx_id) != blacklist_by_id.end()) { - ++sch_itr; - continue; - } + if( exhausted ) { + fc_wlog( _log, "Unable to process all ${n} persisted transactions before deadline, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } else { + fc_dlog( _log, "Processed ${n} persisted transactions, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } + } +} - auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop - ++sch_itr_next; - const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until; - const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id; +void producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point& deadline ) +{ + auto& blacklist_by_expiry = _blacklisted_transactions.get(); + auto now = fc::time_point::now(); + if(!blacklist_by_expiry.empty()) { + int num_expired = 0; + int orig_count = _blacklisted_transactions.size(); + + while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { + if (deadline <= fc::time_point::now()) break; + blacklist_by_expiry.erase(blacklist_by_expiry.begin()); + num_expired++; + } - num_processed++; + fc_dlog( _log, "Processed ${n} blacklisted transactions, Expired ${expired}", + ("n", orig_count)("expired", num_expired) ); + } +} - // configurable ratio of incoming txns vs deferred txns - while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) { - if (scheduled_trx_deadline <= fc::time_point::now()) break; +void producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadline ) +{ + chain::controller& chain = chain_plug->chain(); + auto& persisted_by_id = _persistent_transactions.get(); - auto e = _pending_incoming_transactions.front(); - _pending_incoming_transactions.pop_front(); - --orig_pending_txn_size; - _incoming_trx_weight -= 1.0; - process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - } + // Processing unapplied transactions... + // + if (_producers.empty() && persisted_by_id.empty()) { + // if this node can never produce and has no persisted transactions, + // there is no need for unapplied transactions they can be dropped + chain.get_unapplied_transactions().clear(); + } else { + // derive appliable transactions from unapplied_transactions and drop droppable transactions + unapplied_transactions_type& unapplied_trxs = chain.get_unapplied_transactions(); + if( !unapplied_trxs.empty() ) { + const time_point pending_block_time = chain.pending_block_time(); + auto unapplied_trxs_size = unapplied_trxs.size(); + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; + auto calculate_transaction_category = [&](const transaction_metadata_ptr& trx) { + if (trx->packed_trx->expiration() < pending_block_time) { + return tx_category::EXPIRED; + } else if (persisted_by_id.find(trx->id) != persisted_by_id.end()) { + return tx_category::PERSISTED; + } else { + return tx_category::UNEXPIRED_UNPERSISTED; + } + }; - if (scheduled_trx_deadline <= fc::time_point::now()) { - exhausted = true; - break; + auto itr = unapplied_trxs.begin(); + while( itr != unapplied_trxs.end() ) { + auto itr_next = itr; // save off next since itr may be invalidated by loop + ++itr_next; + + if( deadline <= fc::time_point::now() ) break; + const transaction_metadata_ptr trx = itr->second; + auto category = calculate_transaction_category(trx); + if (category == tx_category::EXPIRED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) + { + if (!_producers.empty()) { + fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}", + ("txid", trx->id)); } + itr = unapplied_trxs.erase( itr ); // unapplied_trxs map has not been modified, so simply erase and continue + continue; + } else if (category == tx_category::PERSISTED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) + { + ++num_processed; try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + auto trx_deadline = fc::time_point::now() + fc::milliseconds( _max_transaction_time_ms ); bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && scheduled_trx_deadline < deadline)) { + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && deadline < trx_deadline)) { deadline_is_subjective = true; - deadline = scheduled_trx_deadline; + trx_deadline = deadline; } - auto trace = chain.push_scheduled_transaction(trx_id, deadline); + auto trace = chain.push_transaction(trx, trx_deadline); if (trace->except) { if (failure_is_subjective(*trace->except, deadline_is_subjective)) { - exhausted = true; break; } else { - auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); - // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist - _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, expiration}); - num_failed++; + // this failed our configured maximum transaction time, we don't want to replay it + // chain.plus_transactions can modify unapplied_trxs, so erase by id + unapplied_trxs.erase( trx->signed_id ); + ++num_failed; } } else { - num_applied++; + ++num_applied; } } LOG_AND_DROP(); + } - _incoming_trx_weight += _incoming_defer_ratio; - if (!orig_pending_txn_size) _incoming_trx_weight = 0.0; + itr = itr_next; + } - if( sch_itr_next == sch_idx.end() ) break; - sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) ); - } + fc_dlog( _log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}", + ("m", num_processed)("n", unapplied_trxs_size)("applied", num_applied)("failed", num_failed) ); + } + } +} - if( scheduled_trxs_size > 0 ) { - fc_dlog( _log, - "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", - ( "m", num_processed ) - ( "n", scheduled_trxs_size ) - ( "applied", num_applied ) - ( "failed", num_failed ) ); - } +void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& orig_pending_txn_size ) +{ + chain::controller& chain = chain_plug->chain(); + const time_point pending_block_time = chain.pending_block_time(); + auto& blacklist_by_id = _blacklisted_transactions.get(); + + // scheduled transactions + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; + + auto scheduled_trx_deadline = deadline; + if (_max_scheduled_transaction_time_per_block_ms >= 0) { + scheduled_trx_deadline = std::min( + scheduled_trx_deadline, + fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) + ); + } + const auto& sch_idx = chain.db().get_index(); + const auto scheduled_trxs_size = sch_idx.size(); + auto sch_itr = sch_idx.begin(); + while( sch_itr != sch_idx.end() ) { + if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet + if( sch_itr->published >= pending_block_time ) { + ++sch_itr; + continue; // do not allow schedule and execute in same block + } + if( scheduled_trx_deadline <= fc::time_point::now() ) { + break; + } - } + const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated + if (blacklist_by_id.find(trx_id) != blacklist_by_id.end()) { + ++sch_itr; + continue; + } - if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() - return start_block_result::failed; - if (exhausted || preprocess_deadline <= fc::time_point::now()) { - return start_block_result::exhausted; - } else { - // attempt to apply any pending incoming transactions - _incoming_trx_weight = 0.0; + auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop + ++sch_itr_next; + const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until; + const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id; - if (!_pending_incoming_transactions.empty()) { - fc_dlog(_log, "Processing ${n} pending transactions", ("n", _pending_incoming_transactions.size())); - while (orig_pending_txn_size && _pending_incoming_transactions.size()) { - if (preprocess_deadline <= fc::time_point::now()) return start_block_result::exhausted; - auto e = _pending_incoming_transactions.front(); - _pending_incoming_transactions.pop_front(); - --orig_pending_txn_size; - process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - } + num_processed++; + + // configurable ratio of incoming txns vs deferred txns + while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) { + if (scheduled_trx_deadline <= fc::time_point::now()) break; + + auto e = _pending_incoming_transactions.front(); + _pending_incoming_transactions.pop_front(); + --orig_pending_txn_size; + _incoming_trx_weight -= 1.0; + process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } + + if (scheduled_trx_deadline <= fc::time_point::now()) { + break; + } + + try { + auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + bool deadline_is_subjective = false; + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && scheduled_trx_deadline < deadline)) { + deadline_is_subjective = true; + deadline = scheduled_trx_deadline; + } + + auto trace = chain.push_scheduled_transaction(trx_id, deadline); + if (trace->except) { + if (failure_is_subjective(*trace->except, deadline_is_subjective)) { + break; + } else { + auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); + // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist + _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, expiration}); + num_failed++; } - return start_block_result::succeeded; + } else { + num_applied++; } + } LOG_AND_DROP(); - } catch ( const guard_exception& e ) { - chain_plugin::handle_guard_exception(e); - return start_block_result::failed; - } catch ( std::bad_alloc& ) { - chain_plugin::handle_bad_alloc(); - } catch ( boost::interprocess::bad_alloc& ) { - chain_plugin::handle_db_exhaustion(); - } + _incoming_trx_weight += _incoming_defer_ratio; + if (!orig_pending_txn_size) _incoming_trx_weight = 0.0; + if( sch_itr_next == sch_idx.end() ) break; + sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) ); } - return start_block_result::failed; + if( scheduled_trxs_size > 0 ) { + fc_dlog( _log, "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", + ( "m", num_processed )( "n", scheduled_trxs_size )( "applied", num_applied )( "failed", num_failed ) ); + } +} + +void producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline, size_t& orig_pending_txn_size ) +{ + if (!_pending_incoming_transactions.empty()) { + fc_dlog(_log, "Processing ${n} pending transactions", ("n", _pending_incoming_transactions.size())); + while (orig_pending_txn_size && _pending_incoming_transactions.size()) { + if( deadline <= fc::time_point::now() ) break; + auto e = _pending_incoming_transactions.front(); + _pending_incoming_transactions.pop_front(); + --orig_pending_txn_size; + process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } + } } void producer_plugin_impl::schedule_production_loop() {