From fe38bcff1eb459bbf0f8c0fb21ecd91fb9c67965 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 25 Sep 2024 22:05:47 +0100 Subject: [PATCH] Streamline prewarming error handling (#7491) --- .../Processing/BlockCachePreWarmer.cs | 218 ++++++++++-------- 1 file changed, 120 insertions(+), 98 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index 6a6ee6822db..0348e98c2df 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -69,9 +69,9 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot if (_logger.IsDebug) _logger.Debug($"Finished pre-warming caches for block {suggestedBlock.Number}."); } - catch (OperationCanceledException) + catch (Exception ex) { - if (_logger.IsDebug) _logger.Debug($"Pre-warming caches cancelled for block {suggestedBlock.Number}."); + if (_logger.IsDebug) _logger.Warn($"Error pre-warming {suggestedBlock.Number}. {ex}"); } finally { @@ -83,35 +83,39 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot) { if (parallelOptions.CancellationToken.IsCancellationRequested) return; - if (spec.WithdrawalsEnabled && block.Withdrawals is not null) + + try { - int progress = 0; - Parallel.For(0, block.Withdrawals.Length, parallelOptions, - _ => - { - IReadOnlyTxProcessorSource env = _envPool.Get(); - int i = 0; - try - { - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); - // Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - i = Interlocked.Increment(ref progress) - 1; - scope.WorldState.WarmUp(block.Withdrawals[i].Address); - } - catch (OperationCanceledException) - { - // Ignore - } - catch (Exception ex) - { - if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex); - } - finally + if (spec.WithdrawalsEnabled && block.Withdrawals is not null) + { + int progress = 0; + Parallel.For(0, block.Withdrawals.Length, parallelOptions, + _ => { - _envPool.Return(env); - } - }); + IReadOnlyTxProcessorSource env = _envPool.Get(); + int i = 0; + try + { + using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); + // Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For + // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 + i = Interlocked.Increment(ref progress) - 1; + scope.WorldState.WarmUp(block.Withdrawals[i].Address); + } + finally + { + _envPool.Return(env); + } + }); + } + } + catch (OperationCanceledException) + { + // Ignore, block completed cancel + } + catch (Exception ex) + { + if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal", ex); } } @@ -119,45 +123,56 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp { if (parallelOptions.CancellationToken.IsCancellationRequested) return; - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, _ => + try { - using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); - IReadOnlyTxProcessorSource env = _envPool.Get(); - SystemTransaction systemTransaction = _systemTransactionPool.Get(); - Transaction? tx = null; - try + int progress = 0; + Parallel.For(0, block.Transactions.Length, parallelOptions, _ => { - // Process transactions in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - int i = Interlocked.Increment(ref progress) - 1; - // If the transaction has already been processed or being processed, exit early - if (block.TransactionProcessed > i) return; - - tx = block.Transactions[i]; - tx.CopyTo(systemTransaction); - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); - if (spec.UseTxAccessLists) + using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); + IReadOnlyTxProcessorSource env = _envPool.Get(); + SystemTransaction systemTransaction = _systemTransactionPool.Get(); + Transaction? tx = null; + try { - scope.WorldState.WarmUp(tx.AccessList); // eip-2930 + // Process transactions in sequential order, rather than partitioning scheme from Parallel.For + // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 + int i = Interlocked.Increment(ref progress) - 1; + // If the transaction has already been processed or being processed, exit early + if (block.TransactionProcessed > i) return; + + tx = block.Transactions[i]; + tx.CopyTo(systemTransaction); + using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); + if (spec.UseTxAccessLists) + { + scope.WorldState.WarmUp(tx.AccessList); // eip-2930 + } + TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance); + if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); } - TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance); - if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); - } - catch (OperationCanceledException) - { - // Ignore - } - catch (Exception ex) - { - if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex); - } - finally - { - _systemTransactionPool.Return(systemTransaction); - _envPool.Return(env); - } - }); + catch (Exception ex) when (ex is EvmException or OverflowException) + { + // Ignore, regular tx processing exceptions + } + catch (Exception ex) + { + if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex); + } + finally + { + _systemTransactionPool.Return(systemTransaction); + _envPool.Return(env); + } + }); + } + catch (OperationCanceledException) + { + // Ignore, block completed cancel + } + catch (Exception ex) + { + if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal", ex); + } } private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer) @@ -173,17 +188,10 @@ private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash25 void IThreadPoolWorkItem.Execute() { - IReadOnlyTxProcessorSource env = null; try { if (parallelOptions.CancellationToken.IsCancellationRequested) return; - env = PreWarmer._envPool.Get(); - using IReadOnlyTxProcessingScope scope = env.Build(StateRoot); - WarmupAddresses(parallelOptions, Block, scope); - } - catch (OperationCanceledException) - { - // Ignore + WarmupAddresses(parallelOptions, Block); } catch (Exception ex) { @@ -191,51 +199,65 @@ void IThreadPoolWorkItem.Execute() } finally { - if (env is not null) PreWarmer._envPool.Return(env); _doneEvent.Set(); } } - private void WarmupAddresses(ParallelOptions parallelOptions, Block block, IReadOnlyTxProcessingScope scope) + private void WarmupAddresses(ParallelOptions parallelOptions, Block block) { if (parallelOptions.CancellationToken.IsCancellationRequested) return; - if (SystemTxAccessList is not null) + try { - scope.WorldState.WarmUp(SystemTxAccessList); - } + if (SystemTxAccessList is not null) + { + var env = PreWarmer._envPool.Get(); + try + { + using IReadOnlyTxProcessingScope scope = env.Build(StateRoot); + scope.WorldState.WarmUp(SystemTxAccessList); + } + finally + { + PreWarmer._envPool.Return(env); + } + } - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, - _ => - { - int i = 0; - try + int progress = 0; + Parallel.For(0, block.Transactions.Length, parallelOptions, + _ => { + int i = 0; // Process addresses in sequential order, rather than partitioning scheme from Parallel.For // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 i = Interlocked.Increment(ref progress) - 1; Transaction tx = block.Transactions[i]; Address? sender = tx.SenderAddress; - if (sender is not null) + + var env = PreWarmer._envPool.Get(); + try { - scope.WorldState.WarmUp(sender); + using IReadOnlyTxProcessingScope scope = env.Build(StateRoot); + if (sender is not null) + { + scope.WorldState.WarmUp(sender); + } + Address to = tx.To; + if (to is not null) + { + scope.WorldState.WarmUp(to); + } } - Address to = tx.To; - if (to is not null) + finally { - scope.WorldState.WarmUp(to); + PreWarmer._envPool.Return(env); } - } - catch (OperationCanceledException) - { - // Ignore - } - catch (Exception ex) - { - if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses {i}", ex); - } - }); + }); + } + catch (OperationCanceledException) + { + // Ignore, block completed cancel + } } }