-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streams TCP Transport with Hand-Written Protobuf Deserialization #4594
base: dev
Are you sure you want to change the base?
Streams TCP Transport with Hand-Written Protobuf Deserialization #4594
Conversation
… in perf. Could still use more tweaks.
…for IO.TCP Bindings/connections to have buffer pool sizes set separate from default settings. Use Compiled Expression to avoid copy of Protobuf ByteString to IO.Bytestring on Streams Transport Write. Create internal IO.Bytestring ReadOnlyCompacted to minimize copying on Reads for Streams Transport.
…ort to get rid of one more copy.
…ransport. Use .ToArray() instead of .ToList() in ActorPath
This reverts commit 4b3d081.
- LRUSegmentCache for caching based on ReadOnlySpan<Byte> segments. - Move All parts of Streaming TCP transport possible to their own Dispatcher for stability
I'll take a look! |
…ppy about it relatively speaking. Use ValueTuple return version of SplitNameAndUid to lower allocations for things like ActorPath op /
What This needs, copied/adapted from #4581Needs AKA "I need help!" :
Nice to have:
|
@Aaronontheweb I added what this needs if anyone in the community or elsewhere wishes to help! If some of this is too extreme, we can move the important parts (scheduler, batch tweaks,etc) and still do a couple more zero-copy tricks on one of the other branches. FWIW I'd say this is the most 'extreme' branch, The one where we are only serializing the protobuf payloads I would say is pretty dang safe. |
@to11mtm have any before / after performance numbers you can share? |
These are round-about numbers ATM, will try to run more real benchmarks later. But these numbers are fair in that they do show some scale: RemotePingPong, I7 8750H, 32GB Ram. Everything (Materializer, IO.TCP,remoting) running under single
|
I added one more bench number to above, since I think it does a decent job of showing that the more threadpools we have involved the more things seem to hurt. That said, I think these are some good baselines for everyone to see where things are at. |
@to11mtm started work reviewing this PR - been working my way through it. I'll go ahead and try to run these benchmarks locally too. |
The mutual exclusion benefits don't really show up well under single connection benchmarks - but we definitely need to be judicious about limiting the number of independent threadpools we use. Returns diminish quickly on each additional one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving my initial comments up - not done yet, but best I could get to this evening
AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) => | ||
{ | ||
Console.WriteLine(eventArgs.ExceptionObject as Exception); | ||
Console.WriteLine("STACKOVERFLOW"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did this come up inside the benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhhhh, it rather involved some Spans and FastHash while I was debugging... :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL
Console.WriteLine(sender); | ||
Console.WriteLine(eventArgs.ExceptionObject as Exception); | ||
}; | ||
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.AboveNormal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably port that change back into a separate PR - good idea.
edit: just to clarify - this change is so good and small that it should find its way into the dev
branch straight away
@@ -0,0 +1,313 @@ | |||
// //----------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to rename this file
@@ -0,0 +1,12 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to reference common.props
here.
For example:
akka.net/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj
Lines 2 to 10 in 3705c8c
<Import Project="..\..\..\common.props" /> | |
<PropertyGroup> | |
<AssemblyTitle>Akka.Persistence.Sqlite</AssemblyTitle> | |
<Description>Akka.NET Persistence journal and snapshot store backed by SQLite.</Description> | |
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks> | |
<PackageTags>$(AkkaPackageTags);persistence;eventsource;sql;sqlite</PackageTags> | |
<GenerateDocumentationFile>true</GenerateDocumentationFile> | |
</PropertyGroup> |
|
||
try | ||
{ | ||
return _queue?.OfferAsync(IO.ByteString.FromBytes(payload.ToByteArray())).Result is QueueOfferResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just change the API here to make Write
an async
method? Having the blocking .Result
call here and the same types of stuff we have to do for DotNetty is less than ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Question still valid on live code so I'll just answer here:)
It proably wouldn't hurt. I was suprised at how much time .OfferAsync
was taking in the profiler.
IIRC the challenge was the actors calling it aren't ReceiveActors and I don't know how to make other ones work with Async :)
public sealed override string SchemeIdentifier { get; protected set; } = | ||
"tcp"; | ||
private ActorMaterializer _mat; | ||
private DotNettyTransportSettings Settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, for making this backwards compatible...
ex => | ||
{ | ||
handle.Notify( | ||
new Disassociated(DisassociateInfo.Unknown)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good way to error-handle this
listener.Notify(new InboundAssociation(handle)); | ||
}); | ||
return NotUsed.Instance; | ||
}).ToMaterialized(Sink.Ignore<NotUsed>(),Keep.Left).Run(_mat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to pass in some hints to the materializer here as to which dispatcher the materialized actors should run on, IIRC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the /core/akka.remote/ code has dispatcher hints :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok - I was wondering if I was missing something
.Queue<IO.ByteString>(512, OverflowStrategy.DropNew) | ||
.Via( | ||
Framing.SimpleFramingProtocolEncoder(Settings.MaxFrameSize)) | ||
.GroupedWithin(256, TimeSpan.FromMilliseconds(40)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, a much more concise way of doing the "batching" than what our current transport does. We'll still need a flag to disable the batching stage via config for connections with low traffic (i.e. a system like Lighthouse)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not gonna lie, I want to FRAME that block of code. While I know there's room for improvement it just feels so elegant.
{ | ||
//TODO: Improve batching voodoo in pipeline. | ||
return Source | ||
.Queue<IO.ByteString>(512, OverflowStrategy.DropNew) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better for us if we just reject the new write to the queue, so the EndpointWriter
can backoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb my bad on this, you want to look at /src/core/akka.remote/ for the most up to date bits.
Moving everything into its own /contrib/ will be a bit more work because of all the internal
things a transport needs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a great start - I’m very impressed with your hard work. I think this will be an amazing improvement over DotNetty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to reading the rest of it
No problem. In fact, I apologize in advance for AkkaPduCodec.cs . You may want to skip that for the moment; right now it is a poster-child for "Why Projects have Contributor Guidelines". |
…m/to11mtm/akka.net into remote-full-manual-protobuf-deser
…g, minor change to HWTS.
@@ -182,16 +202,18 @@ private static async Task<(bool, long, int)> Benchmark(int numberOfClients, long | |||
throw new Exception("Received report that 1 or more remote actor is unable to begin the test. Aborting run."); | |||
} | |||
|
|||
var rng = new Random(); | |||
var rand = new byte[2048]; | |||
rng.NextBytes(rand); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this bit in here because it was interesting to note that some of my tweaks had to change when I started throwing messages a bit larger; If this number used passing this differs a LOT from smaller messages in ping-pong, changing some of the batching parameters (i.e. count, size) usually brings things back in-line.
@@ -359,7 +359,41 @@ akka { | |||
helios.tcp.transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote.Transport.Helios" | |||
|
|||
### Default configuration for the DotNetty based transport drivers | |||
|
|||
streaming.tcp{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a LOT of the options out here even though they can be read from code. Idea here was to make migration from DotNetty as easy as possible
@@ -602,9 +636,23 @@ akka { | |||
default-remote-dispatcher { | |||
type = ForkJoinDispatcher | |||
executor = fork-join-executor | |||
throughput = 30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
30 is default, but I kinda enjoy the Cognitive hint to Users that this is a number they can touch.
(Also, protects from defaults being changed elsewhere)
FastMessageParser. | ||
PayloadParser messageOptionSerializedMessage, | ||
IActorRef messageOptionSenderOptional); | ||
void Dispatch(IInternalActorRef messageOptionRecipient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last overload is an experiement gone wrong in async deserialization and should be removed.
IActorRef messageOptionSenderOptional); | ||
} | ||
|
||
public class DefaultMessageDispatcherActor : ActorBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be removed or put somewhere else to be figured out,
Part of attempt to parallelize the Remote Dispatch. Failed miserably :)
//_sendArgs.SetBuffer(_dataToSend); | ||
#pragma warning disable 4014 | ||
Task.Run(async () => | ||
{ | ||
foreach (var byteString in _dataToSend) | ||
{ | ||
foreach (var buffer in byteString.Buffers) | ||
{ | ||
await _connection.SslStream.WriteAsync( | ||
buffer.Array, | ||
buffer.Offset, buffer.Count); | ||
} | ||
} | ||
|
||
return NotUsed.Instance; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably Bad and Wrong. There's definitely some ugly going on:
- Task.Run is a smell
- Lots of back-and-forth here with awaits when we have small buffers (i.e. after manual serialization/framing/batching)
public ByteString(IList<ByteString> buffers) | ||
{ | ||
_count = buffers.Sum(s => s.Count); | ||
_buffers = buffers.SelectMany(r=>r._buffers).ToArray(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is useful for some cases, but looking at it I can't help but wish we had a nice way to pack this and fetch lazily (so we don't have to do this SelectMany+ToArray
internal ByteBuffer ReadOnlyCompacted() | ||
{ | ||
var c = this.Compact(); | ||
return c._buffers[0]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is to get a Single Contiguous Arraysegment back; For smaller messages, this will most likely not copy/move anything in .Compact()
step.
However in either case, the ArraySegment(ByteBuffer) can then be used directly either by 'Proper' Protobuf deserializer or custom deserializer.
private static ThreadLocal<Random> _rng = new ThreadLocal<Random>(() => new Random(Interlocked.Increment(ref _seed))); | ||
[ThreadStatic] | ||
private static Random _rng2; | ||
//private static ThreadLocal<Random> _rng = new ThreadLocal<Random>(() => new Random(Interlocked.Increment(ref _seed))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoiding ThreadLocal wherever we don't need it for some special reason.
Reasoning here: https://ayende.com/blog/189761-A/production-postmortem-the-slow-slowdown-of-large-systems
private static readonly ThreadLocal<object[]> CurrentInterpreter = new ThreadLocal<object[]>(() => new object[1]); | ||
[ThreadStatic] | ||
private static object[] CurrentInterpreter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing to ThreadStatic because of https://ayende.com/blog/189761-A/production-postmortem-the-slow-slowdown-of-large-systems
Last One, Promise. :)
This branch has a few more tweaks:
LRUSegmentCache
for usingSpan<byte>
for Input (i.e. UTF8 Actor Paths, only copying tostring
if needed)ThreadStatic
fromThreadLocal
where possible