diff --git a/CHANGES.md b/CHANGES.md index 1c30112682c..3b8078aeb5d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,10 @@ To be released. ### Backward-incompatible API changes + - Changed `IMessageCodec.Encode(MessageContent, PrivateKey, + AppProtocolVersion, BoundPeer, DateTimeOffset, byte[]?)` to + `IMessageCodec.Encode(Message, PrivateKey)`. [[#3997]] + ### Backward-incompatible network protocol changes ### Backward-incompatible storage format changes @@ -24,6 +28,8 @@ To be released. ### CLI tools +[#3997]: https://github.com/planetarium/libplanet/pull/3997 + Version 5.4.0 ------------- diff --git a/src/Libplanet.Net/Messages/IMessageCodec.cs b/src/Libplanet.Net/Messages/IMessageCodec.cs index fb56c15c3ed..7dc8184d680 100644 --- a/src/Libplanet.Net/Messages/IMessageCodec.cs +++ b/src/Libplanet.Net/Messages/IMessageCodec.cs @@ -8,32 +8,17 @@ public interface IMessageCodec { /// /// Encodes the message to -typed instance with given - /// and . + /// for signing. /// - /// The message body to encode. + /// The message to encode. /// The to sign the encoded message. /// - /// The of - /// the transport layer. - /// The -typed representation of - /// the transport layer. - /// - /// The of the time - /// is encoded. - /// - /// The byte array identifies the message to match between - /// message and its respond used in . /// A containing the signed . /// /// Thrown when 's - /// does not match that of . - T Encode( - MessageContent content, - PrivateKey privateKey, - AppProtocolVersion appProtocolVersion, - BoundPeer peer, - DateTimeOffset timestamp, - byte[]? identity); + /// does not match that of . + /// + T Encode(Message message, PrivateKey privateKey); /// /// Decodes given -typed into diff --git a/src/Libplanet.Net/Messages/NetMQMessageCodec.cs b/src/Libplanet.Net/Messages/NetMQMessageCodec.cs index 5fa79fc817f..6bb7f29f21f 100644 --- a/src/Libplanet.Net/Messages/NetMQMessageCodec.cs +++ b/src/Libplanet.Net/Messages/NetMQMessageCodec.cs @@ -13,8 +13,6 @@ public class NetMQMessageCodec : IMessageCodec public static readonly int CommonFrames = Enum.GetValues(typeof(MessageFrame)).Length; - private const string TimestampFormat = "yyyy-MM-ddTHH:mm:ss.ffffffZ"; - private readonly Codec _codec; /// @@ -53,38 +51,34 @@ public enum MessageFrame Sign = 4, } - /// + /// public NetMQMessage Encode( - MessageContent content, - PrivateKey privateKey, - AppProtocolVersion appProtocolVersion, - BoundPeer peer, - DateTimeOffset timestamp, - byte[]? identity = null) + Message message, + PrivateKey privateKey) { - if (!privateKey.PublicKey.Equals(peer.PublicKey)) + if (!privateKey.PublicKey.Equals(message.Remote.PublicKey)) { throw new InvalidCredentialException( - $"An invalid private key was provided: " + - $"the provided private key's expected public key is {peer.PublicKey} " + + $"An invalid private key was provided: the provided private key's " + + $"expected public key is {message.Remote.PublicKey} " + $"but its actual public key is {privateKey.PublicKey}.", - peer.PublicKey, + message.Remote.PublicKey, privateKey.PublicKey); } var netMqMessage = new NetMQMessage(); // Write body (by concrete class) - foreach (byte[] frame in content.DataFrames) + foreach (byte[] frame in message.Content.DataFrames) { netMqMessage.Append(frame); } // Write headers. (inverse order, version-type-peer-timestamp) - netMqMessage.Push(timestamp.Ticks); - netMqMessage.Push(_codec.Encode(peer.Bencoded)); - netMqMessage.Push((int)content.Type); - netMqMessage.Push(appProtocolVersion.Token); + netMqMessage.Push(message.Timestamp.Ticks); + netMqMessage.Push(_codec.Encode(message.Remote.Bencoded)); + netMqMessage.Push((int)message.Content.Type); + netMqMessage.Push(message.Version.Token); // Make and insert signature byte[] signature = privateKey.Sign(netMqMessage.ToByteArray()); @@ -92,31 +86,15 @@ public NetMQMessage Encode( frames.Insert((int)MessageFrame.Sign, new NetMQFrame(signature)); netMqMessage = new NetMQMessage(frames); - if (identity != null) + if (message.Identity is { }) { - netMqMessage.Push(identity); + netMqMessage.Push(message.Identity); } return netMqMessage; } - /// - /// Parses a from given . - /// - /// A encoded . - /// A flag to express whether the target is a reply of other message. - /// - /// Thrown if given - /// has not enough for parsing a message type. - /// Returns a of given - /// . If given value cannot be - /// interpreted in , - /// this would return a integer number. - public MessageContent.MessageType ParseMessageType(NetMQMessage encoded, bool reply) - => (MessageContent.MessageType)encoded[(int)MessageFrame.Type + (reply ? 0 : 1)] - .ConvertToInt32(); - - /// + /// public Message Decode(NetMQMessage encoded, bool reply) { if (encoded.FrameCount == 0) diff --git a/src/Libplanet.Net/Transports/BoundPeerExtensions.cs b/src/Libplanet.Net/Transports/BoundPeerExtensions.cs index 9187237aa6d..85c66dfceeb 100644 --- a/src/Libplanet.Net/Transports/BoundPeerExtensions.cs +++ b/src/Libplanet.Net/Transports/BoundPeerExtensions.cs @@ -35,11 +35,13 @@ public static AppProtocolVersion QueryAppProtocolVersionNetMQ( var ping = new PingMsg(); var netMQMessageCodec = new NetMQMessageCodec(); NetMQMessage request = netMQMessageCodec.Encode( - ping, - privateKey, - default, - new BoundPeer(privateKey.PublicKey, new DnsEndPoint("0.0.0.0", 0)), - DateTimeOffset.UtcNow); + new Message( + ping, + default, + new BoundPeer(privateKey.PublicKey, new DnsEndPoint("0.0.0.0", 0)), + DateTimeOffset.UtcNow, + null), + privateKey); TimeSpan timeoutNotNull = timeout ?? TimeSpan.FromSeconds(5); try diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index cb70133db43..4c197c6c11a 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -38,7 +38,7 @@ public class NetMQTransport : ITransport private readonly AsyncManualResetEvent _runningEvent; private readonly ActivitySource _activitySource; - private NetMQQueue<(AsyncManualResetEvent, NetMQMessage)>? _replyQueue; + private NetMQQueue<(AsyncManualResetEvent, Message)>? _replyQueue; private RouterSocket? _router; private NetMQPoller? _routerPoller; @@ -203,7 +203,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) _turnCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _replyQueue = new NetMQQueue<(AsyncManualResetEvent, NetMQMessage)>(); + _replyQueue = new NetMQQueue<(AsyncManualResetEvent, Message)>(); _routerPoller = new NetMQPoller { _router!, _replyQueue }; _router!.ReceiveReady += ReceiveMessage!; @@ -349,19 +349,15 @@ CancellationToken cancellationToken try { - DateTimeOffset now = DateTimeOffset.UtcNow; - NetMQMessage rawMessage = _messageCodec.Encode( - content, - _privateKey, - _appProtocolVersionOptions.AppProtocolVersion, - AsPeer, - DateTimeOffset.UtcNow - ); var req = new MessageRequest( reqId, - rawMessage, + new Message( + content, + _appProtocolVersionOptions.AppProtocolVersion, + AsPeer, + DateTimeOffset.UtcNow, + null), peer, - now, expectedResponses, channel, linkedCt); @@ -550,14 +546,12 @@ public async Task ReplyMessageAsync( _replyQueue!.Enqueue( ( ev, - _messageCodec.Encode( + new Message( content, - _privateKey, _appProtocolVersionOptions.AppProtocolVersion, AsPeer, DateTimeOffset.UtcNow, - identity - ) + identity) ) ); @@ -724,25 +718,28 @@ await ReplyMessageAsync( private void DoReply( object? sender, - NetMQQueueEventArgs<(AsyncManualResetEvent, NetMQMessage)> e + NetMQQueueEventArgs<(AsyncManualResetEvent, Message)> e ) { - (AsyncManualResetEvent ev, NetMQMessage message) = e.Queue.Dequeue(); - string reqId = message[0].Buffer.Length == 16 ? - new Guid(message[0].ToByteArray()).ToString() : "unknown"; - string messageType = _messageCodec.ParseMessageType(message, false).ToString(); + (AsyncManualResetEvent ev, Message message) = e.Queue.Dequeue(); + string reqId = message.Identity is { } identity && identity.Length == 16 + ? new Guid(identity).ToString() + : "unknown"; // FIXME The current timeout value(1 sec) is arbitrary. // We should make this configurable or fix it to an unneeded structure. - if (_router!.TrySendMultipartMessage(TimeSpan.FromSeconds(1), message)) + NetMQMessage netMQMessage = _messageCodec.Encode(message, _privateKey); + if (_router!.TrySendMultipartMessage(TimeSpan.FromSeconds(1), netMQMessage)) { _logger.Debug( - "{Message} as a reply to {Identity} sent", messageType, reqId); + "{Message} as a reply to {Identity} sent", message.Content.Type, reqId); } else { _logger.Debug( - "Failed to send {Message} as a reply to {Identity}", messageType, reqId); + "Failed to send {Message} as a reply to {Identity}", + message.Content.Type, + reqId); } ev.Set(); @@ -763,11 +760,10 @@ private async Task ProcessRuntime(CancellationToken cancellationToken) _logger.Verbose(waitMsg); MessageRequest req = await reader.ReadAsync(cancellationToken); #endif - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); long left = Interlocked.Decrement(ref _requestCount); _logger.Debug( "Request {Message} {RequestId} taken for processing; {Count} requests left", - messageType, + req.Message.Content.Type, req.Id, left); @@ -783,21 +779,20 @@ private async Task ProcessRuntime(CancellationToken cancellationToken) private async Task ProcessRequest(MessageRequest req, CancellationToken cancellationToken) { - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); _logger.Debug( "Request {Message} {RequestId} is ready to be processed in {TimeSpan}", - messageType, + req.Message.Content.Type, req.Id, - DateTimeOffset.UtcNow - req.RequestedTime); + DateTimeOffset.UtcNow - req.Message.Timestamp); Channel channel = req.Channel; _logger.Debug( "Trying to send request {Message} {RequestId} to {Peer}", - messageType, + req.Message.Content.Type, req.Id, req.Peer ); @@ -817,7 +812,7 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella _logger.Debug( "Trying to connect to {Peer} for request {Message} {RequestId}", req.Peer, - messageType, + req.Message.Content.Type, req.Id); dealer.Connect(await req.Peer.ResolveNetMQAddressAsync()); incrementedSocketCount = Interlocked.Increment(ref _socketCount); @@ -828,7 +823,7 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella "{SocketCount} sockets open for processing request " + "{Message} {RequestId}", incrementedSocketCount, - messageType, + req.Message.Content.Type, req.Id); } catch (NetMQException nme) @@ -843,17 +838,20 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella nme, logMsg, Interlocked.Read(ref _socketCount), - messageType, + req.Message.Content.Type, req.Id); throw; } - if (dealer.TrySendMultipartMessage(req.Message)) + var netMQMessage = _messageCodec.Encode( + req.Message, + _privateKey); + if (dealer.TrySendMultipartMessage(netMQMessage)) { _logger.Debug( "Request {RequestId} {Message} sent to {Peer}", req.Id, - messageType, + req.Message.Content.Type, req.Peer); } else @@ -861,11 +859,11 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella _logger.Debug( "Failed to send {RequestId} {Message} to {Peer}", req.Id, - messageType, + req.Message.Content.Type, req.Peer); throw new SendMessageFailException( - $"Failed to send {messageType} to {req.Peer}.", + $"Failed to send {req.Message.Content.Type} to {req.Peer}.", req.Peer); } @@ -893,7 +891,7 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella _logger.Error( e, "Failed to process {Message} {RequestId}; discarding it", - messageType, + req.Message.Content.Type, req.Id); channel.Writer.TryComplete(e); } @@ -918,7 +916,7 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella "processed in {DurationMs} ms with {ReceivedCount} replies received " + "out of {ExpectedCount} expected replies", req.Id, - messageType, + req.Message.Content.Type, stopwatch.ElapsedMilliseconds, receivedCount, req.ExpectedResponses); @@ -980,13 +978,12 @@ Guid reqId ); } - private readonly struct MessageRequest + private class MessageRequest { public MessageRequest( in Guid id, - NetMQMessage message, + Message message, BoundPeer peer, - DateTimeOffset requestedTime, in int expectedResponses, Channel channel, CancellationToken cancellationToken) @@ -994,7 +991,6 @@ public MessageRequest( Id = id; Message = message; Peer = peer; - RequestedTime = requestedTime; ExpectedResponses = expectedResponses; Channel = channel; CancellationToken = cancellationToken; @@ -1002,12 +998,10 @@ public MessageRequest( public Guid Id { get; } - public NetMQMessage Message { get; } + public Message Message { get; } public BoundPeer Peer { get; } - public DateTimeOffset RequestedTime { get; } - public int ExpectedResponses { get; } public Channel Channel { get; } diff --git a/test/Libplanet.Net.Tests/Messages/BlockHashesTest.cs b/test/Libplanet.Net.Tests/Messages/BlockHashesTest.cs index 52042e8ea1f..5fd1fe008e9 100644 --- a/test/Libplanet.Net.Tests/Messages/BlockHashesTest.cs +++ b/test/Libplanet.Net.Tests/Messages/BlockHashesTest.cs @@ -23,20 +23,22 @@ public void Dispose() public void Decode() { BlockHash[] blockHashes = GenerateRandomBlockHashes(100L).ToArray(); - var msg = new BlockHashesMsg(blockHashes); - Assert.Equal(blockHashes, msg.Hashes); + var messageContent = new BlockHashesMsg(blockHashes); + Assert.Equal(blockHashes, messageContent.Hashes); var privateKey = new PrivateKey(); AppProtocolVersion apv = AppProtocolVersion.Sign(privateKey, 3); var peer = new BoundPeer(privateKey.PublicKey, new DnsEndPoint("0.0.0.0", 1234)); var messageCodec = new NetMQMessageCodec(); NetMQMessage encoded = messageCodec.Encode( - msg, - privateKey, - apv, - peer, - DateTimeOffset.UtcNow); + new Message( + messageContent, + apv, + peer, + DateTimeOffset.UtcNow, + null), + privateKey); BlockHashesMsg restored = (BlockHashesMsg)messageCodec.Decode(encoded, true).Content; - Assert.Equal(msg.Hashes, restored.Hashes); + Assert.Equal(messageContent.Hashes, restored.Hashes); } private static IEnumerable GenerateRandomBlockHashes(long count) diff --git a/test/Libplanet.Net.Tests/Messages/MessageTest.cs b/test/Libplanet.Net.Tests/Messages/MessageTest.cs index 453e68c447f..3ffac550e43 100644 --- a/test/Libplanet.Net.Tests/Messages/MessageTest.cs +++ b/test/Libplanet.Net.Tests/Messages/MessageTest.cs @@ -28,10 +28,10 @@ public void BlockHeaderMsg() default(Address)); var dateTimeOffset = DateTimeOffset.UtcNow; Block genesis = ProposeGenesisBlock(GenesisProposer); - var message = new BlockHeaderMsg(genesis.Hash, genesis.Header); + var messageContent = new BlockHeaderMsg(genesis.Hash, genesis.Header); var codec = new NetMQMessageCodec(); - NetMQMessage raw = - codec.Encode(message, privateKey, apv, peer, dateTimeOffset); + NetMQMessage raw = codec.Encode( + new Message(messageContent, apv, peer, dateTimeOffset, null), privateKey); var parsed = codec.Decode(raw, true); Assert.Equal(peer, parsed.Remote); } @@ -39,7 +39,7 @@ public void BlockHeaderMsg() [Fact] public void InvalidCredential() { - var message = new PingMsg(); + var ping = new PingMsg(); var privateKey = new PrivateKey(); var apv = new AppProtocolVersion( 1, @@ -51,7 +51,8 @@ public void InvalidCredential() var badPrivateKey = new PrivateKey(); var codec = new NetMQMessageCodec(); Assert.Throws(() => - codec.Encode(message, badPrivateKey, apv, peer, timestamp)); + codec.Encode( + new Message(ping, apv, peer, timestamp, null), badPrivateKey)); } [Fact] @@ -68,11 +69,13 @@ public void UseInvalidSignature() default(Address)); var ping = new PingMsg(); var codec = new NetMQMessageCodec(); - var netMqMessage = codec.Encode(ping, privateKey, apv, peer, timestamp).ToArray(); + var netMqMessage = codec.Encode( + new Message(ping, apv, peer, timestamp, null), privateKey).ToArray(); // Attacker var fakePeer = new BoundPeer(privateKey.PublicKey, new DnsEndPoint("1.2.3.4", 0)); - var fakeMessage = codec.Encode(ping, privateKey, apv, fakePeer, timestamp).ToArray(); + var fakeMessage = codec.Encode( + new Message(ping, apv, fakePeer, timestamp, null), privateKey).ToArray(); var frames = new NetMQMessage(); frames.Push(netMqMessage[4]); diff --git a/test/Libplanet.Net.Tests/Messages/NetMQMessageCodecTest.cs b/test/Libplanet.Net.Tests/Messages/NetMQMessageCodecTest.cs index b344eb44e21..0ebf3110834 100644 --- a/test/Libplanet.Net.Tests/Messages/NetMQMessageCodecTest.cs +++ b/test/Libplanet.Net.Tests/Messages/NetMQMessageCodecTest.cs @@ -63,16 +63,18 @@ public void CheckMessages(MessageContent.MessageType type) new Bencodex.Types.Integer(0), ImmutableArray.Empty, default(Address)); - var message = CreateMessage(type); + var messageContent = CreateMessage(type); var codec = new NetMQMessageCodec(); NetMQMessage raw = - codec.Encode(message, privateKey, apv, peer, dateTimeOffset); + codec.Encode( + new Message(messageContent, apv, peer, dateTimeOffset, null), + privateKey); var parsed = codec.Decode(raw, true); Assert.Equal(apv, parsed.Version); Assert.Equal(peer, parsed.Remote); Assert.Equal(dateTimeOffset, parsed.Timestamp); - Assert.IsType(message.GetType(), parsed.Content); - Assert.Equal(message.DataFrames, parsed.Content.DataFrames); + Assert.IsType(messageContent.GetType(), parsed.Content); + Assert.Equal(messageContent.DataFrames, parsed.Content.DataFrames); } private MessageContent CreateMessage(MessageContent.MessageType type)