From f0ea7fcdcd997f346c808a528526730dc27693aa Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 31 Jan 2024 14:27:22 +0000 Subject: [PATCH] Exclude Tx Hashes when serving Blocks & apply network back pressure (#6636) --- .../UserOperationsMessageSerializer.cs | 2 +- .../Blocks/BlockStoreTests.cs | 10 +-- .../Nethermind.Blockchain/BlockTree.cs | 6 +- .../BlockTreeMethodOptions.cs | 3 +- .../Blocks/BlockStore.cs | 6 +- .../Blocks/IBlockStore.cs | 2 +- .../Processing/BlockProcessor.cs | 28 ++++++++ .../Nethermind.Merge.Plugin/IMergeConfig.cs | 2 +- .../Nethermind.Merge.Plugin/MergeConfig.cs | 2 +- .../P2P/PacketSenderTests.cs | 3 + .../AddCapabilityMessageSerializer.cs | 2 +- .../Messages/DisconnectMessageSerializer.cs | 2 +- .../P2P/Messages/HelloMessageSerializer.cs | 2 +- .../Nethermind.Network/P2P/PacketSender.cs | 2 +- .../Eth/HashesMessageSerializer.cs | 2 +- .../Messages/BlockBodiesMessageSerializer.cs | 2 +- .../Messages/BlockHeadersMessageSerializer.cs | 2 +- .../GetBlockBodiesMessageSerializer.cs | 2 +- .../GetBlockHeadersMessageSerializer.cs | 2 +- .../NewBlockHashesMessageSerializer.cs | 2 +- .../V62/Messages/NewBlockMessageSerializer.cs | 2 +- .../Messages/TransactionsMessageSerializer.cs | 2 +- .../V63/Messages/NodeDataMessageSerializer.cs | 2 +- .../V63/Messages/ReceiptsMessageSerializer.cs | 2 +- .../V66/Messages/Eth66MessageSerializer.cs | 2 +- ...ledTransactionHashesMessageSerializer68.cs | 2 +- .../Messages/BlockHeadersMessageSerializer.cs | 2 +- .../GetBlockHeadersMessageSerializer.cs | 2 +- .../Messages/NodeDataMessageSerializer.cs | 2 +- .../Messages/AccountRangeMessageSerializer.cs | 2 +- .../Messages/ByteCodesMessageSerializer.cs | 2 +- .../Messages/GetTrieNodesMessageSerializer.cs | 2 +- .../Snap/Messages/SnapSerializerBase.cs | 2 +- .../StorageRangesMessageSerializer.cs | 2 +- .../Messages/TrieNodesMessageSerializer.cs | 2 +- .../BlockWitnessHashesMessageSerializer.cs | 2 +- .../Handshake/AckEip8MessageSerializer.cs | 2 +- .../Handshake/AuthEip8MessageSerializer.cs | 2 +- .../Rlpx/Handshake/AuthMessageSerializer.cs | 2 +- .../Nethermind.Network/Rlpx/RlpxHost.cs | 21 ++++-- .../Rlpx/ZeroFrameEncoder.cs | 6 +- .../Rlpx/ZeroPacketSplitter.cs | 2 +- .../Rlpx/ZeroSnappyEncoder.cs | 3 +- .../BlockDecoder.cs | 4 +- .../KeyValueStoreRlpExtensions.cs | 17 ++--- .../NettyRlpStream.cs | 8 +-- .../RlpBehaviors.cs | 3 +- .../Nethermind.Serialization.Rlp/TxDecoder.cs | 64 +++++++++++-------- .../SyncServerTests.cs | 2 +- .../Nethermind.Synchronization/SyncServer.cs | 2 +- 50 files changed, 155 insertions(+), 99 deletions(-) diff --git a/src/Nethermind/Nethermind.AccountAbstraction/Network/UserOperationsMessageSerializer.cs b/src/Nethermind/Nethermind.AccountAbstraction/Network/UserOperationsMessageSerializer.cs index 33b17626ae9..d9a71523f48 100644 --- a/src/Nethermind/Nethermind.AccountAbstraction/Network/UserOperationsMessageSerializer.cs +++ b/src/Nethermind/Nethermind.AccountAbstraction/Network/UserOperationsMessageSerializer.cs @@ -15,7 +15,7 @@ public class UserOperationsMessageSerializer : IZeroInnerMessageSerializer GetAll(); ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash); void Cache(Block block); diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index 9811501e4e4..bac27021a1a 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -4,6 +4,8 @@ using System; using System.Collections.Generic; using System.Numerics; +using System.Runtime.InteropServices; +using System.Threading; using Nethermind.Blockchain; using Nethermind.Blockchain.Receipts; using Nethermind.Consensus.BeaconBlockRoot; @@ -86,6 +88,7 @@ public Block[] Process(Hash256 newBranchStateRoot, List suggestedBlocks, { if (suggestedBlocks.Count == 0) return Array.Empty(); + TxHashCalculator.CalculateInBackground(suggestedBlocks); BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); /* We need to save the snapshot state root before reorganization in case the new branch has invalid blocks. @@ -371,4 +374,29 @@ private void ApplyDaoTransition(Block block) } } } + + private class TxHashCalculator(List suggestedBlocks) : IThreadPoolWorkItem + { + public static void CalculateInBackground(List suggestedBlocks) + { + // Memory has been reserved on the transactions to delay calculate the hashes + // We calculate the hashes in the background to release that memory + ThreadPool.UnsafeQueueUserWorkItem(new TxHashCalculator(suggestedBlocks), preferLocal: false); + } + + void IThreadPoolWorkItem.Execute() + { + // Hashes will be required for PersistentReceiptStorage in UpdateMainChain ForkchoiceUpdatedHandler + // Which occurs after the block has been processed; however the block is stored in cache and picked up + // from there so we can calculate the hashes now for that later use. + foreach (Block block in CollectionsMarshal.AsSpan(suggestedBlocks)) + { + foreach (Transaction tx in block.Transactions) + { + // Calculate the hashes to release the memory from the transactionSequence + tx.CalculateHashInternal(); + } + } + } + } } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/IMergeConfig.cs b/src/Nethermind/Nethermind.Merge.Plugin/IMergeConfig.cs index 08e92c3c6a7..6bcae33e814 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/IMergeConfig.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/IMergeConfig.cs @@ -60,7 +60,7 @@ Request the garbage collector (GC) to release the process memory. - A positive number to release memory after that many Engine API calls - """, DefaultValue = "75")] + """, DefaultValue = "25")] public int CollectionsPerDecommit { get; set; } [ConfigItem(Description = "The timeout, in seconds, for the `engine_newPayload` method.", DefaultValue = "7", HiddenFromDocs = true)] diff --git a/src/Nethermind/Nethermind.Merge.Plugin/MergeConfig.cs b/src/Nethermind/Nethermind.Merge.Plugin/MergeConfig.cs index 36b1226d40f..f19799e5343 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/MergeConfig.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/MergeConfig.cs @@ -29,7 +29,7 @@ public class MergeConfig : IMergeConfig public GcCompaction CompactMemory { get; set; } = GcCompaction.Yes; - public int CollectionsPerDecommit { get; set; } = 75; + public int CollectionsPerDecommit { get; set; } = 25; public int NewPayloadTimeout { get; set; } = 7; diff --git a/src/Nethermind/Nethermind.Network.Test/P2P/PacketSenderTests.cs b/src/Nethermind/Nethermind.Network.Test/P2P/PacketSenderTests.cs index 69c18635941..d8ca14341fd 100644 --- a/src/Nethermind/Nethermind.Network.Test/P2P/PacketSenderTests.cs +++ b/src/Nethermind/Nethermind.Network.Test/P2P/PacketSenderTests.cs @@ -27,6 +27,7 @@ public void Does_send_on_active_channel() serialized.SafeRelease(); IChannelHandlerContext context = Substitute.For(); IChannel channel = Substitute.For(); + channel.IsWritable.Returns(true); channel.Active.Returns(true); context.Channel.Returns(channel); @@ -46,6 +47,7 @@ public void Does_not_try_to_send_on_inactive_channel() serialized.SafeRelease(); IChannelHandlerContext context = Substitute.For(); IChannel channel = Substitute.For(); + channel.IsWritable.Returns(true); channel.Active.Returns(false); context.Channel.Returns(channel); @@ -65,6 +67,7 @@ public async Task Send_after_delay_if_specified() serialized.SafeRelease(); IChannelHandlerContext context = Substitute.For(); IChannel channel = Substitute.For(); + channel.IsWritable.Returns(true); channel.Active.Returns(true); context.Channel.Returns(channel); diff --git a/src/Nethermind/Nethermind.Network/P2P/Messages/AddCapabilityMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Messages/AddCapabilityMessageSerializer.cs index 6acc3f0c59a..4a798306823 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Messages/AddCapabilityMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Messages/AddCapabilityMessageSerializer.cs @@ -15,7 +15,7 @@ public class AddCapabilityMessageSerializer : IZeroMessageSerializer public void Serialize(IByteBuffer byteBuffer, HelloMessage msg) { (int totalLength, int innerLength) length = GetLength(msg); - byteBuffer.EnsureWritable(Rlp.LengthOfSequence(length.totalLength), true); + byteBuffer.EnsureWritable(Rlp.LengthOfSequence(length.totalLength), force: true); NettyRlpStream stream = new(byteBuffer); stream.StartSequence(length.totalLength); stream.Encode(msg.P2PVersion); diff --git a/src/Nethermind/Nethermind.Network/P2P/PacketSender.cs b/src/Nethermind/Nethermind.Network/P2P/PacketSender.cs index 73beb5aac01..3248eb98115 100644 --- a/src/Nethermind/Nethermind.Network/P2P/PacketSender.cs +++ b/src/Nethermind/Nethermind.Network/P2P/PacketSender.cs @@ -27,7 +27,7 @@ public PacketSender(IMessageSerializationService messageSerializationService, IL public int Enqueue(T message) where T : P2PMessage { - if (!_context.Channel.Active) + if (!_context.Channel.IsWritable || !_context.Channel.Active) { return 0; } diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/HashesMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/HashesMessageSerializer.cs index 43860c56ce1..b11cc993ebb 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/HashesMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/HashesMessageSerializer.cs @@ -24,7 +24,7 @@ protected static Hash256[] DeserializeHashes(RlpStream rlpStream) public void Serialize(IByteBuffer byteBuffer, T message) { int length = GetLength(message, out int contentLength); - byteBuffer.EnsureWritable(length, true); + byteBuffer.EnsureWritable(length); RlpStream rlpStream = new NettyRlpStream(byteBuffer); rlpStream.StartSequence(contentLength); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Messages/BlockBodiesMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Messages/BlockBodiesMessageSerializer.cs index 72f84b5cabb..d4bba370b4f 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Messages/BlockBodiesMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Messages/BlockBodiesMessageSerializer.cs @@ -16,7 +16,7 @@ public class BlockBodiesMessageSerializer : IZeroInnerMessageSerializer ethMes public void Serialize(IByteBuffer byteBuffer, TEth66Message message) { int length = GetLength(message, out int contentLength); - byteBuffer.EnsureWritable(length, true); + byteBuffer.EnsureWritable(length); RlpStream rlpStream = new NettyRlpStream(byteBuffer); rlpStream.StartSequence(contentLength); rlpStream.Encode(message.RequestId); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Messages/NewPooledTransactionHashesMessageSerializer68.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Messages/NewPooledTransactionHashesMessageSerializer68.cs index c00f90c80fb..cb1cc6d0d37 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Messages/NewPooledTransactionHashesMessageSerializer68.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Messages/NewPooledTransactionHashesMessageSerializer68.cs @@ -36,7 +36,7 @@ public void Serialize(IByteBuffer byteBuffer, NewPooledTransactionHashesMessage6 int totalSize = Rlp.LengthOf(message.Types) + Rlp.LengthOfSequence(sizesLength) + Rlp.LengthOfSequence(hashesLength); - byteBuffer.EnsureWritable(totalSize, true); + byteBuffer.EnsureWritable(totalSize); RlpStream rlpStream = new NettyRlpStream(byteBuffer); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/BlockHeadersMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/BlockHeadersMessageSerializer.cs index 8bafc56f90f..7a80cba9989 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/BlockHeadersMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/BlockHeadersMessageSerializer.cs @@ -20,7 +20,7 @@ public void Serialize(IByteBuffer byteBuffer, BlockHeadersMessage message) int totalLength = Rlp.LengthOfSequence(contentLength); RlpStream rlpStream = new NettyRlpStream(byteBuffer); - byteBuffer.EnsureWritable(totalLength, true); + byteBuffer.EnsureWritable(totalLength); rlpStream.StartSequence(contentLength); rlpStream.Encode(message.RequestId); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/GetBlockHeadersMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/GetBlockHeadersMessageSerializer.cs index ee9594ece33..e522b890213 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/GetBlockHeadersMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Les/Messages/GetBlockHeadersMessageSerializer.cs @@ -17,7 +17,7 @@ public void Serialize(IByteBuffer byteBuffer, GetBlockHeadersMessage message) int totalLength = Rlp.LengthOfSequence(contentLength); RlpStream rlpStream = new NettyRlpStream(byteBuffer); - byteBuffer.EnsureWritable(totalLength, true); + byteBuffer.EnsureWritable(totalLength); rlpStream.StartSequence(contentLength); rlpStream.Encode(message.RequestId); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/NodeData/Messages/NodeDataMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/NodeData/Messages/NodeDataMessageSerializer.cs index c3af8fb3b72..37554421383 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/NodeData/Messages/NodeDataMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/NodeData/Messages/NodeDataMessageSerializer.cs @@ -11,7 +11,7 @@ public class NodeDataMessageSerializer : IZeroInnerMessageSerializer : IZeroInnerMessageSerializer whe protected NettyRlpStream GetRlpStreamAndStartSequence(IByteBuffer byteBuffer, T msg) { int totalLength = GetLength(msg, out int contentLength); - byteBuffer.EnsureWritable(totalLength, true); + byteBuffer.EnsureWritable(totalLength); NettyRlpStream stream = new(byteBuffer); stream.StartSequence(contentLength); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/StorageRangesMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/StorageRangesMessageSerializer.cs index 3305091410d..90d498c4a3e 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/StorageRangesMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/StorageRangesMessageSerializer.cs @@ -25,7 +25,7 @@ public void Serialize(IByteBuffer byteBuffer, StorageRangeMessage message) { (int contentLength, int allSlotsLength, int[] accountSlotsLengths, int proofsLength) = CalculateLengths(message); - byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true); + byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength)); NettyRlpStream stream = new(byteBuffer); stream.StartSequence(contentLength); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/TrieNodesMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/TrieNodesMessageSerializer.cs index 436bd1d1833..a7f5a64c0ef 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/TrieNodesMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/Messages/TrieNodesMessageSerializer.cs @@ -12,7 +12,7 @@ public void Serialize(IByteBuffer byteBuffer, TrieNodesMessage message) { (int contentLength, int nodesLength) = GetLength(message); - byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true); + byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength)); NettyRlpStream rlpStream = new(byteBuffer); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Wit/Messages/BlockWitnessHashesMessageSerializer.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Wit/Messages/BlockWitnessHashesMessageSerializer.cs index 3e7df9d3bb7..23f99bfa29d 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Wit/Messages/BlockWitnessHashesMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Wit/Messages/BlockWitnessHashesMessageSerializer.cs @@ -15,7 +15,7 @@ public void Serialize(IByteBuffer byteBuffer, BlockWitnessHashesMessage message) int contentLength = GetLength(message, out int totalLength); - byteBuffer.EnsureWritable(totalLength, true); + byteBuffer.EnsureWritable(totalLength); nettyRlpStream.StartSequence(contentLength); nettyRlpStream.Encode(message.RequestId); if (message.Hashes is null) diff --git a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AckEip8MessageSerializer.cs b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AckEip8MessageSerializer.cs index 1151e16baca..4bc7552cf93 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AckEip8MessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AckEip8MessageSerializer.cs @@ -28,7 +28,7 @@ public void Serialize(IByteBuffer byteBuffer, AckEip8Message msg) totalLength += Rlp.LengthOf(msg.Nonce); totalLength += Rlp.LengthOf(msg.Version); - byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength), true); + byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength)); NettyRlpStream stream = new(byteBuffer); stream.StartSequence(totalLength); stream.Encode(msg.EphemeralPublicKey.Bytes); diff --git a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthEip8MessageSerializer.cs b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthEip8MessageSerializer.cs index a51ae149752..d310ecec7cb 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthEip8MessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthEip8MessageSerializer.cs @@ -22,7 +22,7 @@ public void Serialize(IByteBuffer byteBuffer, AuthEip8Message msg) { int totalLength = GetLength(msg); // TODO: Account for the padding - byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength), true); + byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength)); NettyRlpStream stream = new(byteBuffer); stream.StartSequence(totalLength); stream.Encode(Bytes.Concat(msg.Signature.Bytes, msg.Signature.RecoveryId)); diff --git a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthMessageSerializer.cs b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthMessageSerializer.cs index 3269c93934d..f87075d06d7 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthMessageSerializer.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/Handshake/AuthMessageSerializer.cs @@ -37,7 +37,7 @@ public class AuthMessageSerializer : IZeroMessageSerializer public void Serialize(IByteBuffer byteBuffer, AuthMessage msg) { - byteBuffer.EnsureWritable(Length, true); + byteBuffer.EnsureWritable(Length); byteBuffer.WriteBytes(msg.Signature.Bytes); byteBuffer.WriteByte(msg.Signature.RecoveryId); byteBuffer.WriteBytes(msg.EphemeralPublicHash.Bytes); diff --git a/src/Nethermind/Nethermind.Network/Rlpx/RlpxHost.cs b/src/Nethermind/Nethermind.Network/Rlpx/RlpxHost.cs index eec9422046d..7e9305057df 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/RlpxHost.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/RlpxHost.cs @@ -12,6 +12,7 @@ using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using Nethermind.Core.Crypto; +using Nethermind.Core.Extensions; using Nethermind.Logging; using Nethermind.Network.Config; using Nethermind.Network.P2P; @@ -112,6 +113,11 @@ public async Task Init() .Group(_bossGroup, _workerGroup) .Channel() .ChildOption(ChannelOption.SoBacklog, 100) + .ChildOption(ChannelOption.TcpNodelay, true) + .ChildOption(ChannelOption.SoTimeout, (int)_connectTimeout.TotalMilliseconds) + .ChildOption(ChannelOption.SoKeepalive, true) + .ChildOption(ChannelOption.WriteBufferHighWaterMark, (int)3.MB()) + .ChildOption(ChannelOption.WriteBufferLowWaterMark, (int)1.MB()) .Handler(new LoggingHandler("BOSS", LogLevel.TRACE)) .ChildHandler(new ActionChannelInitializer(ch => { @@ -164,11 +170,16 @@ public async Task ConnectAsync(Node node) if (_logger.IsTrace) _logger.Trace($"|NetworkTrace| {node:s} initiating OUT connection"); Bootstrap clientBootstrap = new(); - clientBootstrap.Group(_workerGroup); - clientBootstrap.Channel(); - clientBootstrap.Option(ChannelOption.TcpNodelay, true); - clientBootstrap.Option(ChannelOption.MessageSizeEstimator, DefaultMessageSizeEstimator.Default); - clientBootstrap.Option(ChannelOption.ConnectTimeout, _connectTimeout); + clientBootstrap + .Group(_workerGroup) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Option(ChannelOption.SoTimeout, (int)_connectTimeout.TotalMilliseconds) + .Option(ChannelOption.SoKeepalive, true) + .Option(ChannelOption.WriteBufferHighWaterMark, (int)3.MB()) + .Option(ChannelOption.WriteBufferLowWaterMark, (int)1.MB()) + .Option(ChannelOption.MessageSizeEstimator, DefaultMessageSizeEstimator.Default) + .Option(ChannelOption.ConnectTimeout, _connectTimeout); clientBootstrap.Handler(new ActionChannelInitializer(ch => { Session session = new(LocalPort, node, ch, _disconnectsAnalyzer, _logManager); diff --git a/src/Nethermind/Nethermind.Network/Rlpx/ZeroFrameEncoder.cs b/src/Nethermind/Nethermind.Network/Rlpx/ZeroFrameEncoder.cs index 2dccd6af28d..b5307cb624b 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/ZeroFrameEncoder.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/ZeroFrameEncoder.cs @@ -38,11 +38,7 @@ protected override void Encode(IChannelHandlerContext context, IByteBuffer input FrameHeaderReader.FrameInfo frame = _headerReader.ReadFrameHeader(input); - // 0 if the buffer has enough writable bytes, and its capacity is unchanged. - // 1 if the buffer does not have enough bytes, and its capacity is unchanged. - // 2 if the buffer has enough writable bytes, and its capacity has been increased. - // 3 if the buffer does not have enough bytes, but its capacity has been increased to its maximum. - _ = output.EnsureWritable(Frame.HeaderSize + Frame.MacSize + frame.PayloadSize + Frame.MacSize, true); + output.EnsureWritable(Frame.HeaderSize + Frame.MacSize + frame.PayloadSize + Frame.MacSize); WriteHeader(output); WriteHeaderMac(output); diff --git a/src/Nethermind/Nethermind.Network/Rlpx/ZeroPacketSplitter.cs b/src/Nethermind/Nethermind.Network/Rlpx/ZeroPacketSplitter.cs index 03ed67d0c29..b3fcc5ad2e3 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/ZeroPacketSplitter.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/ZeroPacketSplitter.cs @@ -46,7 +46,7 @@ protected override void Encode(IChannelHandlerContext context, IByteBuffer input int totalPayloadOffset = MaxFrameSize * i; int framePayloadSize = Math.Min(MaxFrameSize, totalPayloadSize - totalPayloadOffset); int paddingSize = i == framesCount - 1 ? Frame.CalculatePadding(totalPayloadSize) : 0; - output.EnsureWritable(Frame.HeaderSize + framePayloadSize + paddingSize, true); + output.EnsureWritable(Frame.HeaderSize + framePayloadSize + paddingSize); // 000 - 016 | header // 016 - 01x | packet type diff --git a/src/Nethermind/Nethermind.Network/Rlpx/ZeroSnappyEncoder.cs b/src/Nethermind/Nethermind.Network/Rlpx/ZeroSnappyEncoder.cs index 0ead11eacf4..72669447986 100644 --- a/src/Nethermind/Nethermind.Network/Rlpx/ZeroSnappyEncoder.cs +++ b/src/Nethermind/Nethermind.Network/Rlpx/ZeroSnappyEncoder.cs @@ -23,10 +23,9 @@ protected override void Encode(IChannelHandlerContext context, IByteBuffer input { byte packetType = input.ReadByte(); - output.EnsureWritable(1, true); + output.EnsureWritable(1 + SnappyCodec.GetMaxCompressedLength(input.ReadableBytes)); output.WriteByte(packetType); - output.EnsureWritable(SnappyCodec.GetMaxCompressedLength(input.ReadableBytes), true); if (_logger.IsTrace) _logger.Trace($"Compressing with Snappy a message of length {input.ReadableBytes}"); int length = SnappyCodec.Compress( input.Array, diff --git a/src/Nethermind/Nethermind.Serialization.Rlp/BlockDecoder.cs b/src/Nethermind/Nethermind.Serialization.Rlp/BlockDecoder.cs index 54c26d93d2c..71a5256baf1 100644 --- a/src/Nethermind/Nethermind.Serialization.Rlp/BlockDecoder.cs +++ b/src/Nethermind/Nethermind.Serialization.Rlp/BlockDecoder.cs @@ -37,7 +37,7 @@ public class BlockDecoder : IRlpValueDecoder, IRlpStreamDecoder List transactions = new(); while (rlpStream.Position < transactionsCheck) { - transactions.Add(Rlp.Decode(rlpStream)); + transactions.Add(Rlp.Decode(rlpStream, rlpBehaviors)); } rlpStream.Check(transactionsCheck); @@ -176,7 +176,7 @@ public int GetLength(Block? item, RlpBehaviors rlpBehaviors) List transactions = new(); while (decoderContext.Position < transactionsCheck) { - transactions.Add(Rlp.Decode(ref decoderContext)); + transactions.Add(Rlp.Decode(ref decoderContext, rlpBehaviors)); } decoderContext.Check(transactionsCheck); diff --git a/src/Nethermind/Nethermind.Serialization.Rlp/KeyValueStoreRlpExtensions.cs b/src/Nethermind/Nethermind.Serialization.Rlp/KeyValueStoreRlpExtensions.cs index 5ddf500d2be..88694f12b15 100644 --- a/src/Nethermind/Nethermind.Serialization.Rlp/KeyValueStoreRlpExtensions.cs +++ b/src/Nethermind/Nethermind.Serialization.Rlp/KeyValueStoreRlpExtensions.cs @@ -15,22 +15,22 @@ public static class KeyValueStoreRlpExtensions { [SkipLocalsInit] public static TItem? Get(this IReadOnlyKeyValueStore db, long blockNumber, ValueHash256 hash, IRlpStreamDecoder decoder, - LruCache cache = null, bool shouldCache = true) where TItem : class + LruCache cache = null, RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = true) where TItem : class { Span dbKey = stackalloc byte[40]; KeyValueStoreExtensions.GetBlockNumPrefixedKey(blockNumber, hash, dbKey); - return Get(db, hash, dbKey, decoder, cache, shouldCache); + return Get(db, hash, dbKey, decoder, cache, rlpBehaviors, shouldCache); } - public static TItem? Get(this IReadOnlyKeyValueStore db, ValueHash256 key, IRlpStreamDecoder decoder, LruCache cache = null, bool shouldCache = true) where TItem : class + public static TItem? Get(this IReadOnlyKeyValueStore db, ValueHash256 key, IRlpStreamDecoder decoder, LruCache cache = null, RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = true) where TItem : class { - return Get(db, key, key.Bytes, decoder, cache, shouldCache); + return Get(db, key, key.Bytes, decoder, cache, rlpBehaviors, shouldCache); } - public static TItem? Get(this IReadOnlyKeyValueStore db, long key, IRlpStreamDecoder? decoder, LruCache? cache = null, bool shouldCache = true) where TItem : class + public static TItem? Get(this IReadOnlyKeyValueStore db, long key, IRlpStreamDecoder? decoder, LruCache? cache = null, RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = true) where TItem : class { byte[] keyDb = key.ToBigEndianByteArrayWithoutLeadingZeros(); - return Get(db, key, keyDb, decoder, cache, shouldCache); + return Get(db, key, keyDb, decoder, cache, rlpBehaviors, shouldCache); } public static TItem? Get( @@ -39,6 +39,7 @@ public static class KeyValueStoreRlpExtensions ReadOnlySpan key, IRlpStreamDecoder decoder, LruCache cache = null, + RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = true ) where TItem : class { @@ -61,7 +62,7 @@ public static class KeyValueStoreRlpExtensions } var rlpValueContext = data.AsRlpValueContext(); - item = valueDecoder.Decode(ref rlpValueContext, RlpBehaviors.AllowExtraBytes); + item = valueDecoder.Decode(ref rlpValueContext, rlpBehaviors | RlpBehaviors.AllowExtraBytes); } finally { @@ -82,7 +83,7 @@ public static class KeyValueStoreRlpExtensions using NettyRlpStream nettyRlpStream = new NettyRlpStream(buff); - item = decoder.Decode(nettyRlpStream, RlpBehaviors.AllowExtraBytes); + item = decoder.Decode(nettyRlpStream, rlpBehaviors | RlpBehaviors.AllowExtraBytes); } } diff --git a/src/Nethermind/Nethermind.Serialization.Rlp/NettyRlpStream.cs b/src/Nethermind/Nethermind.Serialization.Rlp/NettyRlpStream.cs index 66f21af06c3..652a3dded3b 100644 --- a/src/Nethermind/Nethermind.Serialization.Rlp/NettyRlpStream.cs +++ b/src/Nethermind/Nethermind.Serialization.Rlp/NettyRlpStream.cs @@ -22,7 +22,7 @@ public NettyRlpStream(IByteBuffer buffer) public override void Write(ReadOnlySpan bytesToWrite) { - _buffer.EnsureWritable(bytesToWrite.Length, true); + _buffer.EnsureWritable(bytesToWrite.Length); Span target = _buffer.Array.AsSpan(_buffer.ArrayOffset + _buffer.WriterIndex, bytesToWrite.Length); @@ -34,7 +34,7 @@ public override void Write(ReadOnlySpan bytesToWrite) public override void Write(IReadOnlyList bytesToWrite) { - _buffer.EnsureWritable(bytesToWrite.Count, true); + _buffer.EnsureWritable(bytesToWrite.Count); Span target = _buffer.Array.AsSpan(_buffer.ArrayOffset + _buffer.WriterIndex, bytesToWrite.Count); for (int i = 0; i < bytesToWrite.Count; ++i) @@ -48,13 +48,13 @@ public override void Write(IReadOnlyList bytesToWrite) public override void WriteByte(byte byteToWrite) { - _buffer.EnsureWritable(1, true); + _buffer.EnsureWritable(1); _buffer.WriteByte(byteToWrite); } protected override void WriteZero(int length) { - _buffer.EnsureWritable(length, true); + _buffer.EnsureWritable(length); _buffer.WriteZero(length); } diff --git a/src/Nethermind/Nethermind.Serialization.Rlp/RlpBehaviors.cs b/src/Nethermind/Nethermind.Serialization.Rlp/RlpBehaviors.cs index 22ea7ab40e6..7fc2de36c9a 100644 --- a/src/Nethermind/Nethermind.Serialization.Rlp/RlpBehaviors.cs +++ b/src/Nethermind/Nethermind.Serialization.Rlp/RlpBehaviors.cs @@ -30,5 +30,6 @@ public enum RlpBehaviors /// See https://eips.ethereum.org/EIPS/eip-4844#networking /// InMempoolForm = 64, - All = AllowExtraBytes | ForSealing | Storage | Eip658Receipts | AllowUnsigned | SkipTypedWrapping | InMempoolForm + ExcludeHashes = 128, + All = AllowExtraBytes | ForSealing | Storage | Eip658Receipts | AllowUnsigned | SkipTypedWrapping | InMempoolForm | ExcludeHashes, } diff --git a/src/Nethermind/Nethermind.Serialization.Rlp/TxDecoder.cs b/src/Nethermind/Nethermind.Serialization.Rlp/TxDecoder.cs index b99f719f1fd..ba1172a0b2d 100644 --- a/src/Nethermind/Nethermind.Serialization.Rlp/TxDecoder.cs +++ b/src/Nethermind/Nethermind.Serialization.Rlp/TxDecoder.cs @@ -115,22 +115,28 @@ protected virtual T NewTx() break; } - if ((rlpBehaviors & RlpBehaviors.AllowExtraBytes) != RlpBehaviors.AllowExtraBytes) + if ((rlpBehaviors & RlpBehaviors.AllowExtraBytes) == 0) { rlpStream.Check(positionAfterNetworkWrapper); } - transaction.Hash = CalculateHashForNetworkPayloadForm(transaction.Type, transactionSequence); - } - else if (transactionSequence.Length <= TxDecoder.MaxDelayedHashTxnSize && _lazyHash) - { - // Delay hash generation, as may be filtered as having too low gas etc - transaction.SetPreHashNoLock(transactionSequence); + if ((rlpBehaviors & RlpBehaviors.ExcludeHashes) == 0) + { + transaction.Hash = CalculateHashForNetworkPayloadForm(transaction.Type, transactionSequence); + } } - else + else if ((rlpBehaviors & RlpBehaviors.ExcludeHashes) == 0) { - // Just calculate the Hash as txn too large - transaction.Hash = Keccak.Compute(transactionSequence); + if (transactionSequence.Length <= TxDecoder.MaxDelayedHashTxnSize && _lazyHash) + { + // Delay hash generation, as may be filtered as having too low gas etc + transaction.SetPreHashNoLock(transactionSequence); + } + else + { + // Just calculate the Hash as txn too large + transaction.Hash = Keccak.Compute(transactionSequence); + } } return transaction; @@ -480,34 +486,40 @@ public void Decode(ref Rlp.ValueDecoderContext decoderContext, ref T? transactio break; } - if ((rlpBehaviors & RlpBehaviors.AllowExtraBytes) != RlpBehaviors.AllowExtraBytes) + if ((rlpBehaviors & RlpBehaviors.AllowExtraBytes) == 0) { decoderContext.Check(networkWrapperCheck); } - transaction.Hash = CalculateHashForNetworkPayloadForm(transaction.Type, transactionSequence); + if ((rlpBehaviors & RlpBehaviors.ExcludeHashes) == 0) + { + transaction.Hash = CalculateHashForNetworkPayloadForm(transaction.Type, transactionSequence); + } } - else if (transactionSequence.Length <= TxDecoder.MaxDelayedHashTxnSize && _lazyHash) + else if ((rlpBehaviors & RlpBehaviors.ExcludeHashes) == 0) { - // Delay hash generation, as may be filtered as having too low gas etc - if (decoderContext.ShouldSliceMemory) + if (transactionSequence.Length <= TxDecoder.MaxDelayedHashTxnSize && _lazyHash) { - // Do not copy the memory in this case. - int currentPosition = decoderContext.Position; - decoderContext.Position = txSequenceStart; - transaction.SetPreHashMemoryNoLock(decoderContext.ReadMemory(transactionSequence.Length)); - decoderContext.Position = currentPosition; + // Delay hash generation, as may be filtered as having too low gas etc + if (decoderContext.ShouldSliceMemory) + { + // Do not copy the memory in this case. + int currentPosition = decoderContext.Position; + decoderContext.Position = txSequenceStart; + transaction.SetPreHashMemoryNoLock(decoderContext.ReadMemory(transactionSequence.Length)); + decoderContext.Position = currentPosition; + } + else + { + transaction.SetPreHashNoLock(transactionSequence); + } } else { - transaction.SetPreHashNoLock(transactionSequence); + // Just calculate the Hash immediately as txn too large + transaction.Hash = Keccak.Compute(transactionSequence); } } - else - { - // Just calculate the Hash immediately as txn too large - transaction.Hash = Keccak.Compute(transactionSequence); - } } private static void DecodeSignature( diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs index 8ecab27bc76..31880468319 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs @@ -77,7 +77,7 @@ public void When_finding_by_hash_block_info_is_not_loaded() { Context ctx = new(); ctx.SyncServer.Find(TestItem.KeccakA); - ctx.BlockTree.Received().FindBlock(Arg.Any(), BlockTreeLookupOptions.TotalDifficultyNotNeeded); + ctx.BlockTree.Received().FindBlock(Arg.Any(), BlockTreeLookupOptions.TotalDifficultyNotNeeded | BlockTreeLookupOptions.ExcludeTxHashes); } [TestCase(true, true, true)] diff --git a/src/Nethermind/Nethermind.Synchronization/SyncServer.cs b/src/Nethermind/Nethermind.Synchronization/SyncServer.cs index 84e47412f08..fa3b08bc221 100644 --- a/src/Nethermind/Nethermind.Synchronization/SyncServer.cs +++ b/src/Nethermind/Nethermind.Synchronization/SyncServer.cs @@ -414,7 +414,7 @@ public BlockHeader FindLowestCommonAncestor(BlockHeader firstDescendant, BlockHe return _blockTree.FindLowestCommonAncestor(firstDescendant, secondDescendant, Sync.MaxReorgLength); } - public Block Find(Hash256 hash) => _blockTree.FindBlock(hash, BlockTreeLookupOptions.TotalDifficultyNotNeeded); + public Block Find(Hash256 hash) => _blockTree.FindBlock(hash, BlockTreeLookupOptions.TotalDifficultyNotNeeded | BlockTreeLookupOptions.ExcludeTxHashes); public Hash256? FindHash(long number) {