Skip to content

Commit

Permalink
Streamline prewarming error handling (#7491)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored and brbrr committed Sep 26, 2024
1 parent 90d8699 commit fe38bcf
Showing 1 changed file with 120 additions and 98 deletions.
218 changes: 120 additions & 98 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot

if (_logger.IsDebug) _logger.Debug($"Finished pre-warming caches for block {suggestedBlock.Number}.");
}
catch (OperationCanceledException)
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Debug($"Pre-warming caches cancelled for block {suggestedBlock.Number}.");
if (_logger.IsDebug) _logger.Warn($"Error pre-warming {suggestedBlock.Number}. {ex}");
}
finally
{
Expand All @@ -83,81 +83,96 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot
private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)

try
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (OperationCanceledException)
{
// Ignore
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex);
}
finally
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
{
_envPool.Return(env);
}
});
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
finally
{
_envPool.Return(env);
}
});
}
}
catch (OperationCanceledException)
{
// Ignore, block completed cancel
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal", ex);
}
}

private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
try
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (OperationCanceledException)
{
// Ignore
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
}
});
catch (Exception ex) when (ex is EvmException or OverflowException)
{
// Ignore, regular tx processing exceptions
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
}
});
}
catch (OperationCanceledException)
{
// Ignore, block completed cancel
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal", ex);
}
}

private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer)
Expand All @@ -173,69 +188,76 @@ private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash25

void IThreadPoolWorkItem.Execute()
{
IReadOnlyTxProcessorSource env = null;
try
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
env = PreWarmer._envPool.Get();
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
WarmupAddresses(parallelOptions, Block, scope);
}
catch (OperationCanceledException)
{
// Ignore
WarmupAddresses(parallelOptions, Block);
}
catch (Exception ex)
{
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses", ex);
}
finally
{
if (env is not null) PreWarmer._envPool.Return(env);
_doneEvent.Set();
}
}

private void WarmupAddresses(ParallelOptions parallelOptions, Block block, IReadOnlyTxProcessingScope scope)
private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

if (SystemTxAccessList is not null)
try
{
scope.WorldState.WarmUp(SystemTxAccessList);
}
if (SystemTxAccessList is not null)
{
var env = PreWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
scope.WorldState.WarmUp(SystemTxAccessList);
}
finally
{
PreWarmer._envPool.Return(env);
}
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
{
int i = 0;
try
int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
{
int i = 0;
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Address? sender = tx.SenderAddress;
if (sender is not null)
var env = PreWarmer._envPool.Get();
try
{
scope.WorldState.WarmUp(sender);
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
}
Address to = tx.To;
if (to is not null)
{
scope.WorldState.WarmUp(to);
}
}
Address to = tx.To;
if (to is not null)
finally
{
scope.WorldState.WarmUp(to);
PreWarmer._envPool.Return(env);
}
}
catch (OperationCanceledException)
{
// Ignore
}
catch (Exception ex)
{
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses {i}", ex);
}
});
});
}
catch (OperationCanceledException)
{
// Ignore, block completed cancel
}
}
}

Expand Down

0 comments on commit fe38bcf

Please sign in to comment.