Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams TCP Transport with Hand-Written Protobuf Deserialization #4594

Draft
wants to merge 27 commits into
base: dev
Choose a base branch
from

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Oct 28, 2020

Last One, Promise. :)

This branch has a few more tweaks:

  • Hand written Deserializer for Remote Protobuf payloads to minimize copies on read.
  • LRUSegmentCache for using Span<byte> for Input (i.e. UTF8 Actor Paths, only copying to string if needed)
  • Switching to ThreadStatic from ThreadLocal where possible
  • Moving Stream Materializer for Streaming transport to it's own dispatcher config. At the moment it is using thread pool which lowers performance a bit, but hopefully can be brought up with tweaks to batch settings or thread pool throughput.

…for IO.TCP Bindings/connections to have buffer pool sizes set separate from default settings.

Use Compiled Expression to avoid copy of Protobuf ByteString to IO.Bytestring on Streams Transport Write.
Create internal IO.Bytestring ReadOnlyCompacted to minimize copying on Reads for Streams Transport.
…ransport.

Use .ToArray() instead of .ToList() in ActorPath
This reverts commit 4b3d081.
- LRUSegmentCache for caching based on ReadOnlySpan<Byte> segments.
- Move All parts of Streaming TCP transport possible to their own Dispatcher for stability
@Aaronontheweb
Copy link
Member

I'll take a look!

…ppy about it relatively speaking.

Use ValueTuple return version of SplitNameAndUid to lower allocations for things like ActorPath op /
@to11mtm
Copy link
Member Author

to11mtm commented Oct 29, 2020

What This needs, copied/adapted from #4581

Needs AKA "I need help!" :

  • Tests around SSL Concept
  • Any adaptations necessary for SSL in transport (I think this will actually be minimal thanks to the pattern of it just being an extra Inet.SocketOption)
  • Hardcoded entries moved into configuration (I'll try to work on this if I have time.)
    • Config entries are no longer hard coded, but we should decide whether to fill in certain parts or fallback to Dotnetty Settings.
  • Hardening around using no delay in batch grouping. RemotePingpong becomes unstable in this scenario.
  • Potentially testing/hardening around public/private configuration scenarios. TCP is not my forte.
  • Checks against dotnetty/helios compatibility. When I squint at DotNetty I'm hopeful the Akka.Streams framing is the same. If it's not we would have to make framing stages for that as well as helios. But it certainly seems simple.
  • MUST Check/Fix Not entirely certain the protobuf varint128 code handles negative numbers correctly. I borrowed it from the google repo but might have picked something that didn't do negative numbers
  • This is a very strict and opinionated parser. We may want to provide a fallback mode in case the way older clients encode messages is not able to be decoded. Google's protobuf serializer does NOT guarantee deterministic order of fields, this parser expects it.

Nice to have:

  • Someone who knows streams better than I may know a few tricks to make the pipeline better. The places I put Async stages at were based on repeated RemotePingPong benchmarks as well as some debug checks (i.e. checking how many bytes were being pushed in the final bufferedSelectFlow stage.)

@to11mtm
Copy link
Member Author

to11mtm commented Oct 29, 2020

@Aaronontheweb I added what this needs if anyone in the community or elsewhere wishes to help!

If some of this is too extreme, we can move the important parts (scheduler, batch tweaks,etc) and still do a couple more zero-copy tricks on one of the other branches.

FWIW I'd say this is the most 'extreme' branch, The one where we are only serializing the protobuf payloads I would say is pretty dang safe.

@Aaronontheweb
Copy link
Member

@to11mtm have any before / after performance numbers you can share?

@to11mtm
Copy link
Member Author

to11mtm commented Oct 30, 2020

These are round-about numbers ATM, will try to run more real benchmarks later.

But these numbers are fair in that they do show some scale:

RemotePingPong, I7 8750H, 32GB Ram.

Everything (Materializer, IO.TCP,remoting) running under single dedicatedthreadpool:

  • Streaming, No special Protobuf Serialization (i.e. just minimizing copies from ByteString to ByteArray for serialization/deserialization:
  • ~200,000-210,000 messages per sec for numclients >1

Dedicated Thread Pool for Materializer/IO.TCP, Remoting under Remoting Thread Pool:

  • Streaming, No special Protobuf Serialization (i.e. just minimizing copies from ByteString to ByteArray for serialization/deserialization:

    • ~170,000-180,000 messages per sec for numclients>1
  • Streaming, Hand-Written Protobuf Serializer/Deserializer (this branch)

    • 220,000-240,000 messages per sec for numclients >1
  • DotNetty, TCP

    • 220,000-240,000 Messages per sec for numclients >1

Running streaming all streaming transport bits (i.e. Materializer, remoting, IO.TCP actors) on akka.actor.default-dispatcher:

  • Streaming, No special Protobuf Serialization

    • ~210,000-240,000 messages per sec for numclients >1
  • Streaming, Hand written Protobuf Serializer/Deserializer (This branch)

    • ~240,000-300,000 Messages per sec for numclients >1

@to11mtm
Copy link
Member Author

to11mtm commented Oct 30, 2020

I added one more bench number to above, since I think it does a decent job of showing that the more threadpools we have involved the more things seem to hurt.

That said, I think these are some good baselines for everyone to see where things are at.

@Aaronontheweb
Copy link
Member

@to11mtm started work reviewing this PR - been working my way through it. I'll go ahead and try to run these benchmarks locally too.

@Aaronontheweb
Copy link
Member

I added one more bench number to above, since I think it does a decent job of showing that the more threadpools we have involved the more things seem to hurt.

The mutual exclusion benefits don't really show up well under single connection benchmarks - but we definitely need to be judicious about limiting the number of independent threadpools we use. Returns diminish quickly on each additional one.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving my initial comments up - not done yet, but best I could get to this evening

AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) =>
{
Console.WriteLine(eventArgs.ExceptionObject as Exception);
Console.WriteLine("STACKOVERFLOW");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did this come up inside the benchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhhhh, it rather involved some Spans and FastHash while I was debugging... :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL

Console.WriteLine(sender);
Console.WriteLine(eventArgs.ExceptionObject as Exception);
};
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.AboveNormal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably port that change back into a separate PR - good idea.

edit: just to clarify - this change is so good and small that it should find its way into the dev branch straight away

@@ -0,0 +1,313 @@
// //-----------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to rename this file

@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to reference common.props here.

For example:

<Import Project="..\..\..\common.props" />
<PropertyGroup>
<AssemblyTitle>Akka.Persistence.Sqlite</AssemblyTitle>
<Description>Akka.NET Persistence journal and snapshot store backed by SQLite.</Description>
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);persistence;eventsource;sql;sqlite</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>


try
{
return _queue?.OfferAsync(IO.ByteString.FromBytes(payload.ToByteArray())).Result is QueueOfferResult
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just change the API here to make Write an async method? Having the blocking .Result call here and the same types of stuff we have to do for DotNetty is less than ideal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Question still valid on live code so I'll just answer here:)
It proably wouldn't hurt. I was suprised at how much time .OfferAsync was taking in the profiler.

IIRC the challenge was the actors calling it aren't ReceiveActors and I don't know how to make other ones work with Async :)

public sealed override string SchemeIdentifier { get; protected set; } =
"tcp";
private ActorMaterializer _mat;
private DotNettyTransportSettings Settings;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, for making this backwards compatible...

ex =>
{
handle.Notify(
new Disassociated(DisassociateInfo.Unknown));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good way to error-handle this

listener.Notify(new InboundAssociation(handle));
});
return NotUsed.Instance;
}).ToMaterialized(Sink.Ignore<NotUsed>(),Keep.Left).Run(_mat);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass in some hints to the materializer here as to which dispatcher the materialized actors should run on, IIRC

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the /core/akka.remote/ code has dispatcher hints :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok - I was wondering if I was missing something

.Queue<IO.ByteString>(512, OverflowStrategy.DropNew)
.Via(
Framing.SimpleFramingProtocolEncoder(Settings.MaxFrameSize))
.GroupedWithin(256, TimeSpan.FromMilliseconds(40))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, a much more concise way of doing the "batching" than what our current transport does. We'll still need a flag to disable the batching stage via config for connections with low traffic (i.e. a system like Lighthouse)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not gonna lie, I want to FRAME that block of code. While I know there's room for improvement it just feels so elegant.

{
//TODO: Improve batching voodoo in pipeline.
return Source
.Queue<IO.ByteString>(512, OverflowStrategy.DropNew)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better for us if we just reject the new write to the queue, so the EndpointWriter can backoff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb my bad on this, you want to look at /src/core/akka.remote/ for the most up to date bits.

Moving everything into its own /contrib/ will be a bit more work because of all the internal things a transport needs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great start - I’m very impressed with your hard work. I think this will be an amazing improvement over DotNetty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to reading the rest of it

@to11mtm
Copy link
Member Author

to11mtm commented Oct 30, 2020

Leaving my initial comments up - not done yet, but best I could get to this evening

No problem. In fact, I apologize in advance for AkkaPduCodec.cs . You may want to skip that for the moment; right now it is a poster-child for "Why Projects have Contributor Guidelines".

@@ -182,16 +202,18 @@ private static async Task<(bool, long, int)> Benchmark(int numberOfClients, long
throw new Exception("Received report that 1 or more remote actor is unable to begin the test. Aborting run.");
}

var rng = new Random();
var rand = new byte[2048];
rng.NextBytes(rand);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this bit in here because it was interesting to note that some of my tweaks had to change when I started throwing messages a bit larger; If this number used passing this differs a LOT from smaller messages in ping-pong, changing some of the batching parameters (i.e. count, size) usually brings things back in-line.

@@ -359,7 +359,41 @@ akka {
helios.tcp.transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote.Transport.Helios"

### Default configuration for the DotNetty based transport drivers

streaming.tcp{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a LOT of the options out here even though they can be read from code. Idea here was to make migration from DotNetty as easy as possible

@@ -602,9 +636,23 @@ akka {
default-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
throughput = 30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 is default, but I kinda enjoy the Cognitive hint to Users that this is a number they can touch.
(Also, protects from defaults being changed elsewhere)

FastMessageParser.
PayloadParser messageOptionSerializedMessage,
IActorRef messageOptionSenderOptional);
void Dispatch(IInternalActorRef messageOptionRecipient,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last overload is an experiement gone wrong in async deserialization and should be removed.

IActorRef messageOptionSenderOptional);
}

public class DefaultMessageDispatcherActor : ActorBase
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be removed or put somewhere else to be figured out,
Part of attempt to parallelize the Remote Dispatch. Failed miserably :)

Comment on lines +989 to +1003
//_sendArgs.SetBuffer(_dataToSend);
#pragma warning disable 4014
Task.Run(async () =>
{
foreach (var byteString in _dataToSend)
{
foreach (var buffer in byteString.Buffers)
{
await _connection.SslStream.WriteAsync(
buffer.Array,
buffer.Offset, buffer.Count);
}
}

return NotUsed.Instance;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably Bad and Wrong. There's definitely some ugly going on:

  • Task.Run is a smell
  • Lots of back-and-forth here with awaits when we have small buffers (i.e. after manual serialization/framing/batching)

Comment on lines +195 to +200
public ByteString(IList<ByteString> buffers)
{
_count = buffers.Sum(s => s.Count);
_buffers = buffers.SelectMany(r=>r._buffers).ToArray();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is useful for some cases, but looking at it I can't help but wish we had a nice way to pack this and fetch lazily (so we don't have to do this SelectMany+ToArray

Comment on lines +461 to +465
internal ByteBuffer ReadOnlyCompacted()
{
var c = this.Compact();
return c._buffers[0];
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to get a Single Contiguous Arraysegment back; For smaller messages, this will most likely not copy/move anything in .Compact() step.

However in either case, the ArraySegment(ByteBuffer) can then be used directly either by 'Proper' Protobuf deserializer or custom deserializer.

Comment on lines -22 to +24
private static ThreadLocal<Random> _rng = new ThreadLocal<Random>(() => new Random(Interlocked.Increment(ref _seed)));
[ThreadStatic]
private static Random _rng2;
//private static ThreadLocal<Random> _rng = new ThreadLocal<Random>(() => new Random(Interlocked.Increment(ref _seed)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding ThreadLocal wherever we don't need it for some special reason.

Reasoning here: https://ayende.com/blog/189761-A/production-postmortem-the-slow-slowdown-of-large-systems

Comment on lines -347 to +348
private static readonly ThreadLocal<object[]> CurrentInterpreter = new ThreadLocal<object[]>(() => new object[1]);
[ThreadStatic]
private static object[] CurrentInterpreter;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants