Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams TCP Transport with Hand-Written Protobuf Deserialization #4594

Draft
wants to merge 27 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0cbffbf
save state
to11mtm Oct 3, 2020
d4e7c8e
save
to11mtm Oct 3, 2020
a2217e2
Clean up materialization (arguably)
to11mtm Oct 3, 2020
52e931b
improve batching
to11mtm Oct 3, 2020
831d032
improve stage
to11mtm Oct 4, 2020
eb42e44
Tweak Stage to where we are are about equal to Dotnetty with batching…
to11mtm Oct 4, 2020
5c93ec5
save batch updates and other fixes
to11mtm Oct 8, 2020
0baec4f
Update with fixes for DNS
to11mtm Oct 9, 2020
20f7e82
allow netcore improvements via multi package, clean up stages, allow …
to11mtm Oct 11, 2020
72d8a5e
clean up
to11mtm Oct 15, 2020
07f7128
Make sure Protobuf bytearray hack won't blow up if protobuf internals…
to11mtm Oct 17, 2020
c7a3e5e
Add Configuration bits
to11mtm Oct 17, 2020
173d15d
Serialize Protobuf manually using IO.ByteString on AkkaProtocolTransp…
to11mtm Oct 17, 2020
9e4f20e
Clean up fixes to BatchWriter
to11mtm Oct 17, 2020
08739f9
Allow ArraySegments to be used in InboundPayload to lower copies in T…
to11mtm Oct 17, 2020
1efd76f
Try manually serializing protobuf payloads
to11mtm Oct 19, 2020
4b3d081
test for reverse
to11mtm Oct 20, 2020
985bc11
Revert "test for reverse"
to11mtm Oct 20, 2020
e8551e5
checkpoint all the things
to11mtm Oct 24, 2020
1f5e9e4
working impl
to11mtm Oct 24, 2020
3a98998
- Hand-Written Protobuf Deserializer to minimize allocations
to11mtm Oct 25, 2020
98b7e11
Push Changes Where everything is on it's own dispatcher and pretty ha…
to11mtm Oct 29, 2020
d04fbc1
fix unit test
to11mtm Oct 29, 2020
ba46ce5
Merge branch 'dev' into remote-full-manual-protobuf-deser
to11mtm Oct 29, 2020
2be3022
Clean up AkkaPduCodec.cs
to11mtm Oct 30, 2020
f490009
Merge branch 'remote-full-manual-protobuf-deser' of https://github.co…
to11mtm Oct 31, 2020
3cfd7a2
Fix Negative Int32 parsing, be a little smarter about manifest cachin…
to11mtm Oct 31, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/Akka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Protobuf", "Protobuf", "{98
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{10C5B1E8-15B5-4EB3-81AE-1FC054FE1305}"
ProjectSection(SolutionItems) = preProject
..\build.fsx = ..\build.fsx
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Contrib", "Contrib", "{588C1513-FAB6-42C3-B6FC-3485F13620CF}"
EndProject
Expand Down
38 changes: 30 additions & 8 deletions src/benchmark/RemotePingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr

dot-netty.tcp {
port = 0
hostname = ""localhost""
hostname = """"
batching {
enabled = true
flush-interval = 40ms
}
}
}
}");
Expand All @@ -69,7 +73,14 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr

private static void Main(params string[] args)
{
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.High;
AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) =>
{
Console.WriteLine(eventArgs.ExceptionObject as Exception);
Console.WriteLine("STACKOVERFLOW");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did this come up inside the benchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhhhh, it rather involved some Spans and FastHash while I was debugging... :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL

Console.WriteLine(sender);
Console.WriteLine(eventArgs.ExceptionObject as Exception);
};
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.AboveNormal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably port that change back into a separate PR - good idea.

edit: just to clarify - this change is so good and small that it should find its way into the dev branch straight away

uint timesToRun;
if (args.Length == 0 || !uint.TryParse(args[0], out timesToRun))
{
Expand All @@ -82,7 +93,7 @@ private static void Main(params string[] args)

private static async void Start(uint timesToRun)
{
const long repeat = 100000L;
const long repeat = 50000L;

var processorCount = Environment.ProcessorCount;
if (processorCount == 0)
Expand Down Expand Up @@ -116,9 +127,18 @@ private static async void Start(uint timesToRun)
var bestThroughput = 0L;
foreach (var throughput in GetClientSettings())
{
var result1 = await Benchmark(throughput, repeat, bestThroughput, redCount);
bestThroughput = result1.Item2;
redCount = result1.Item3;
try
{
var result1 = await Benchmark(throughput, repeat, bestThroughput, redCount);
bestThroughput = result1.Item2;
redCount = result1.Item3;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}

}
}

Expand Down Expand Up @@ -182,16 +202,18 @@ private static long GetTotalMessagesReceived(int numberOfClients, long numberOfR
throw new Exception("Received report that 1 or more remote actor is unable to begin the test. Aborting run.");
}

var rng = new Random();
var rand = new byte[2048];
rng.NextBytes(rand);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this bit in here because it was interesting to note that some of my tweaks had to change when I started throwing messages a bit larger; If this number used passing this differs a LOT from smaller messages in ping-pong, changing some of the batching parameters (i.e. count, size) usually brings things back in-line.

var sw = Stopwatch.StartNew();
receivers.ForEach(c =>
{
for (var i = 0; i < 50; i++) // prime the pump so EndpointWriters can take advantage of their batching model
c.Tell("hit");
c.Tell("hi");
});
var waiting = Task.WhenAll(tasks);
await Task.WhenAll(waiting);
sw.Stop();

// force clean termination
var termination = Task.WhenAll(new[] { system1.Terminate(), system2.Terminate() }).Wait(TimeSpan.FromSeconds(10));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to reference common.props here.

For example:

<Import Project="..\..\..\common.props" />
<PropertyGroup>
<AssemblyTitle>Akka.Persistence.Sqlite</AssemblyTitle>
<Description>Akka.NET Persistence journal and snapshot store backed by SQLite.</Description>
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);persistence;eventsource;sql;sqlite</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\core\Akka.Remote\Akka.Remote.csproj" />
<ProjectReference Include="..\..\..\core\Akka.Streams\Akka.Streams.csproj" />
</ItemGroup>

</Project>
Loading