1
+ // //-----------------------------------------------------------------------
2
+ // // <copyright file="AkkaPduCodecBenchmark.cs" company="Akka.NET Project">
3
+ // // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4
+ // // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5
+ // // </copyright>
6
+ // //-----------------------------------------------------------------------
7
+
8
+ using System ;
9
+ using System . Threading ;
10
+ using System . Threading . Tasks ;
11
+ using Akka . Actor ;
12
+ using Akka . Actor . Dsl ;
13
+ using Akka . Benchmarks . Configurations ;
14
+ using Akka . Configuration ;
15
+ using Akka . Remote ;
16
+ using Akka . Remote . Serialization ;
17
+ using Akka . Remote . Transport ;
18
+ using BenchmarkDotNet . Attributes ;
19
+ using BenchmarkDotNet . Loggers ;
20
+ using Google . Protobuf ;
21
+
22
+ namespace Akka . Benchmarks . Remoting
23
+ {
24
+ [ Config ( typeof ( MicroBenchmarkConfig ) ) ]
25
+ public class AkkaPduCodecBenchmark
26
+ {
27
+ public const int Operations = 10_000 ;
28
+
29
+ private ExtendedActorSystem _sys1 ;
30
+ private ExtendedActorSystem _sys2 ;
31
+ private IRemoteActorRefProvider _rarp ;
32
+
33
+ private Config _config = @"akka.actor.provider = remote
34
+ akka.remote.dot-netty.tcp.port = 0" ;
35
+
36
+ private IActorRef _senderActorRef ;
37
+ private IActorRef _localReceiveRef ;
38
+ private RemoteActorRef _remoteReceiveRef ;
39
+ private RemoteActorRef _remoteSenderRef ;
40
+
41
+ private Address _addr1 ;
42
+ private Address _addr2 ;
43
+ private AkkaPduProtobuffCodec _recvCodec ;
44
+ private AkkaPduProtobuffCodec _sendCodec ;
45
+
46
+ /// <summary>
47
+ /// The message we're going to serialize
48
+ /// </summary>
49
+ private readonly object _message = "foobar" ;
50
+
51
+ private readonly Ack _lastAck = new Ack ( - 1 ) ;
52
+
53
+ private ByteString _fullDecode ;
54
+ private ByteString _pduDecoded ;
55
+ private Akka . Remote . Serialization . Proto . Msg . Payload _payloadDecoded ;
56
+
57
+ [ GlobalSetup ]
58
+ public async Task Setup ( )
59
+ {
60
+ _sys1 = ( ExtendedActorSystem ) ActorSystem . Create ( "BenchSys" , _config ) ;
61
+ _sys2 = ( ExtendedActorSystem ) ActorSystem . Create ( "BenchSys" , _config ) ;
62
+ _rarp = RARP . For ( _sys1 ) . Provider ;
63
+ _addr1 = _rarp . DefaultAddress ;
64
+ _addr2 = RARP . For ( _sys2 ) . Provider . DefaultAddress ;
65
+
66
+ _senderActorRef =
67
+ _sys2 . ActorOf ( act => { act . ReceiveAny ( ( o , context ) => context . Sender . Tell ( context . Sender ) ) ; } ,
68
+ "sender1" ) ;
69
+
70
+ _localReceiveRef = _sys1 . ActorOf ( act => { act . ReceiveAny ( ( o , context ) => context . Sender . Tell ( context . Sender ) ) ; } ,
71
+ "recv1" ) ;
72
+
73
+ // create an association
74
+ _remoteReceiveRef = ( RemoteActorRef ) ( await _sys2 . ActorSelection ( new RootActorPath ( RARP . For ( _sys1 ) . Provider . DefaultAddress ) / "user" /
75
+ _localReceiveRef . Path . Name ) . ResolveOne ( TimeSpan . FromSeconds ( 3 ) ) ) ;
76
+
77
+ _remoteSenderRef = ( RemoteActorRef ) ( await _sys1 . ActorSelection ( new RootActorPath ( RARP . For ( _sys2 ) . Provider . DefaultAddress ) / "user" /
78
+ _senderActorRef . Path . Name ) . ResolveOne ( TimeSpan . FromSeconds ( 3 ) ) ) ;
79
+
80
+ _recvCodec = new AkkaPduProtobuffCodec ( _sys1 ) ;
81
+ _sendCodec = new AkkaPduProtobuffCodec ( _sys2 ) ;
82
+ _fullDecode = CreatePayloadPdu ( ) ;
83
+ _pduDecoded = ( ( Payload ) _recvCodec . DecodePdu ( _fullDecode ) ) . Bytes ;
84
+ _payloadDecoded = _recvCodec . DecodeMessage ( _pduDecoded , _rarp , _addr1 ) . MessageOption . SerializedMessage ;
85
+ }
86
+
87
+ [ GlobalCleanup ]
88
+ public async Task Cleanup ( )
89
+ {
90
+
91
+ void PrintCacheStats ( string prefix , ActorSystem sys )
92
+ {
93
+ var resolveCache = ActorRefResolveThreadLocalCache . For ( sys ) ;
94
+ var pathCache = ActorPathThreadLocalCache . For ( sys ) ;
95
+ var addressCache = AddressThreadLocalCache . For ( sys ) ;
96
+
97
+ ConsoleLogger . Default . WriteLine ( LogKind . Result ,
98
+ $ "[{ prefix } ] ResolveCache entries: [{ resolveCache . Cache . Stats . Entries } ]") ;
99
+ ConsoleLogger . Default . WriteLine ( LogKind . Result ,
100
+ $ "[{ prefix } ] PathCache entries: [{ pathCache . Cache . Stats . Entries } ]") ;
101
+ ConsoleLogger . Default . WriteLine ( LogKind . Result ,
102
+ $ "[{ prefix } ] AddressCache entries: [{ addressCache . Cache . Stats . Entries } ]") ;
103
+ }
104
+
105
+ PrintCacheStats ( "Addr1" , _sys1 ) ;
106
+ PrintCacheStats ( "Addr2" , _sys2 ) ;
107
+
108
+ var resolveCache = ActorRefResolveThreadLocalCache . For ( _sys1 ) ;
109
+ var pathCache = ActorPathThreadLocalCache . For ( _sys1 ) ;
110
+ var addressCache = AddressThreadLocalCache . For ( _sys1 ) ;
111
+
112
+ var senderResolveCache = ActorRefResolveThreadLocalCache . For ( _sys2 ) ;
113
+ var senderPathCache = ActorPathThreadLocalCache . For ( _sys2 ) ;
114
+ var senderAddressCache = AddressThreadLocalCache . For ( _sys2 ) ;
115
+
116
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr1] Used ResolveCache for recipient? { resolveCache . Cache . TryGet ( _remoteReceiveRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
117
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr1] Used PathCache for recipient? { pathCache . Cache . TryGet ( _remoteReceiveRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
118
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr1] Used ResolveCache for sender? { resolveCache . Cache . TryGet ( _remoteSenderRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
119
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr1] Used PathCache for sender? { pathCache . Cache . TryGet ( _remoteSenderRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
120
+
121
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr2] Used ResolveCache for recipient? { senderResolveCache . Cache . TryGet ( _remoteReceiveRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
122
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr2] Used PathCache for recipient? { senderPathCache . Cache . TryGet ( _remoteReceiveRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
123
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr2] Used ResolveCache for sender? { senderResolveCache . Cache . TryGet ( _senderActorRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
124
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr2] Used PathCache for sender? { senderPathCache . Cache . TryGet ( _senderActorRef . Path . ToSerializationFormat ( ) , out _ ) } ") ;
125
+
126
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr1] Used AddressCache for sys1? { addressCache . Cache . TryGet ( _addr2 . ToString ( ) , out _ ) } ") ;
127
+ ConsoleLogger . Default . WriteLine ( LogKind . Result , $ "[Addr2] Used AddressCache for sys2? { senderAddressCache . Cache . TryGet ( _addr1 . ToString ( ) , out _ ) } ") ;
128
+
129
+ await Task . WhenAll ( _sys1 . Terminate ( ) , _sys2 . Terminate ( ) ) ;
130
+ }
131
+
132
+ /// <summary>
133
+ /// Simulates the write-side of the wire
134
+ /// </summary>
135
+ [ Benchmark ( OperationsPerInvoke = Operations ) ]
136
+ public void WritePayloadPdu ( )
137
+ {
138
+ for ( var i = 0 ; i < Operations ; i ++ )
139
+ {
140
+ CreatePayloadPdu ( ) ;
141
+ }
142
+ }
143
+
144
+ /// <summary>
145
+ /// Simulates the read-side of the wire
146
+ /// </summary>
147
+ [ Benchmark ( OperationsPerInvoke = Operations ) ]
148
+ public void DecodePayloadPdu ( )
149
+ {
150
+ for ( var i = 0 ; i < Operations ; i ++ )
151
+ {
152
+ var pdu = _recvCodec . DecodePdu ( _fullDecode ) ;
153
+ if ( pdu is Payload p )
154
+ {
155
+ var msg = _recvCodec . DecodeMessage ( p . Bytes , _rarp , _addr1 ) ;
156
+ var deserialize = MessageSerializer . Deserialize ( _sys1 , msg . MessageOption . SerializedMessage ) ;
157
+ }
158
+ }
159
+ }
160
+
161
+ [ Benchmark ( OperationsPerInvoke = Operations ) ]
162
+ public void DecodePduOnly ( )
163
+ {
164
+ for ( var i = 0 ; i < Operations ; i ++ )
165
+ {
166
+ var pdu = _recvCodec . DecodePdu ( _fullDecode ) ;
167
+ }
168
+ }
169
+
170
+ [ Benchmark ( OperationsPerInvoke = Operations ) ]
171
+ public void DecodeMessageOnly ( )
172
+ {
173
+ for ( var i = 0 ; i < Operations ; i ++ )
174
+ {
175
+ var msg = _recvCodec . DecodeMessage ( _pduDecoded , _rarp , _addr1 ) ;
176
+ }
177
+ }
178
+
179
+ [ Benchmark ( OperationsPerInvoke = Operations ) ]
180
+ public void DeserializePayloadOnly ( )
181
+ {
182
+ for ( var i = 0 ; i < Operations ; i ++ )
183
+ {
184
+ var deserialize = MessageSerializer . Deserialize ( _sys1 , _payloadDecoded ) ;
185
+ }
186
+ }
187
+
188
+ private ByteString CreatePayloadPdu ( )
189
+ {
190
+ return _sendCodec . ConstructPayload ( _sendCodec . ConstructMessage ( _remoteReceiveRef . LocalAddressToUse , _remoteReceiveRef ,
191
+ MessageSerializer . Serialize ( _sys2 , _addr2 , _message ) , _senderActorRef , null , _lastAck ) ) ;
192
+ }
193
+ }
194
+ }
0 commit comments