Skip to content

Commit 11a968e

Browse files
Fixes to TCP/WS Framing (committing to length-only (no EoM) so that the transport doesn't have to deal with multiple frames in a stream and reframing them. Fixes to server-side wiring of protocol handling to enable bi-directional server calls (untested yet). New RPC generated classes and helper base class (in-progress). EchoService as first prototype in RPC Sample.
1 parent 7ef0de5 commit 11a968e

21 files changed

+566
-162
lines changed

RSocket.Core.Tests/TestServer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public class TestServer : RSocketServer
2424
public TestServer(IRSocketServerTransport transport) : base(transport) { }
2525

2626
public override void Setup(in RSocketProtocol.Setup value) => All.Add(new Message.Setup(value));
27-
public override void RequestStream(in RSocketProtocol.RequestStream value) => All.Add(new Message.RequestStream(value));
28-
27+
public override void RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => All.Add(new Message.RequestStream(message));
28+
2929

3030
public class Message
3131
{

RSocket.Core/BufferWriter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ private Span<byte> GetBuffer(int needed)
7272

7373
public int WriteUInt16BigEndian(int value) => WriteUInt16BigEndian((UInt16)value);
7474
public int WriteUInt16BigEndian(UInt16 value) { BinaryPrimitives.WriteUInt16BigEndian(GetBuffer(sizeof(UInt16)), value); Used += sizeof(UInt16); return sizeof(UInt16); }
75+
76+
//public int WriteUInt16LittleEndian(UInt16 value) { BinaryPrimitives.WriteUInt16LittleEndian(GetBuffer(sizeof(UInt16)), value); Used += sizeof(UInt16); return sizeof(UInt16); }
77+
7578
public int WriteInt32BigEndian(Int32 value) { BinaryPrimitives.WriteInt32BigEndian(GetBuffer(sizeof(Int32)), value); Used += sizeof(Int32); return sizeof(Int32); }
7679
public int WriteInt64BigEndian(Int64 value) { BinaryPrimitives.WriteInt64BigEndian(GetBuffer(sizeof(Int64)), value); Used += sizeof(Int64); return sizeof(Int64); }
7780
public int WriteUInt32BigEndian(UInt32 value) { BinaryPrimitives.WriteUInt32BigEndian(GetBuffer(sizeof(UInt32)), value); Used += sizeof(UInt32); return sizeof(UInt32); }

RSocket.Core/IRSocketProtocol.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ public interface IRSocketProtocol
1111
void Setup(in RSocketProtocol.Setup message);
1212
void Error(in RSocketProtocol.Error message);
1313
void Payload(in RSocketProtocol.Payload message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data);
14-
void RequestStream(in RSocketProtocol.RequestStream message);
14+
void RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data);
15+
void RequestResponse(in RSocketProtocol.RequestResponse message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data);
16+
void RequestFireAndForget(in RSocketProtocol.RequestFireAndForget message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data);
17+
void RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data);
1518
}
1619
}

RSocket.Core/RSocketClient.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public RSocketClient(IRSocketTransport transport, RSocketClientOptions options =
3737
public async Task<RSocketClient> ConnectAsync()
3838
{
3939
await Transport.ConnectAsync();
40-
var server = RSocketProtocol.Handler2(this, Transport.Input, CancellationToken.None);
40+
var server = RSocketProtocol.Handler(this, Transport.Input, CancellationToken.None, name: nameof(RSocketClient));
4141
////TODO Move defaults to policy object
4242
new RSocketProtocol.Setup(keepalive: TimeSpan.FromSeconds(60), lifetime: TimeSpan.FromSeconds(180), metadataMimeType: "binary", dataMimeType: "binary").Write(Transport.Output);
4343
await Transport.Output.FlushAsync();
@@ -132,7 +132,10 @@ void IRSocketProtocol.Payload(in RSocketProtocol.Payload message, ReadOnlySequen
132132
}
133133

134134
void IRSocketProtocol.Setup(in RSocketProtocol.Setup value) => throw new InvalidOperationException($"Client cannot process Setup frames");
135-
void IRSocketProtocol.RequestStream(in RSocketProtocol.RequestStream message) => throw new NotImplementedException(); //TODO How to handle unexpected messagess...
136-
void IRSocketProtocol.Error(in RSocketProtocol.Error message) { throw new NotImplementedException(); } //TODO Handle Errors!
135+
void IRSocketProtocol.Error(in RSocketProtocol.Error message) { throw new NotImplementedException(); } //TODO Handle Errors!
136+
void IRSocketProtocol.RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => throw new NotImplementedException(); //TODO How to handle unexpected messagess...
137+
void IRSocketProtocol.RequestResponse(in RSocketProtocol.RequestResponse message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => throw new NotImplementedException();
138+
void IRSocketProtocol.RequestFireAndForget(in RSocketProtocol.RequestFireAndForget message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => throw new NotImplementedException();
139+
void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => throw new NotImplementedException();
137140
}
138141
}

RSocket.Core/RSocketProtocol.Handler.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ partial class RSocketProtocol
1616
static void OnSetup(IRSocketProtocol sink, in RSocketProtocol.Setup message) => sink.Setup(message);
1717
static void OnError(IRSocketProtocol sink, in RSocketProtocol.Error message) => sink.Error(message);
1818
static void OnPayload(IRSocketProtocol sink, in RSocketProtocol.Payload message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => sink.Payload(message, metadata, data);
19-
static void OnRequestStream(IRSocketProtocol sink, in RSocketProtocol.RequestStream message) => sink.RequestStream(message);
20-
19+
static void OnRequestStream(IRSocketProtocol sink, in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => sink.RequestStream(message, metadata, data);
20+
static void OnRequestResponse(IRSocketProtocol sink, in RSocketProtocol.RequestResponse message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => sink.RequestResponse(message, metadata, data);
21+
static void OnRequestFireAndForget(IRSocketProtocol sink, in RSocketProtocol.RequestFireAndForget message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => sink.RequestFireAndForget(message, metadata, data);
22+
static void OnRequestChannel(IRSocketProtocol sink, in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => sink.RequestChannel(message, metadata, data);
2123

2224
static public async Task Handler(IRSocketProtocol sink, PipeReader pipereader, CancellationToken cancellation, string name = null)
2325
{
@@ -31,12 +33,13 @@ static public async Task Handler(IRSocketProtocol sink, PipeReader pipereader, C
3133
var position = buffer.Start;
3234

3335
//Due to the nature of Pipelines as simple binary pipes, all Transport adapters assemble a standard message frame whether or not the underlying transport signals length, EoM, etc.
34-
if (!TryReadInt32BigEndian(buffer, ref position, out int frame)) { pipereader.AdvanceTo(buffer.Start, buffer.End); continue; }
35-
var (framelength, ismessageend) = MessageFrame(frame);
36-
if (buffer.Length < framelength) { pipereader.AdvanceTo(buffer.Start, buffer.End); continue; } //Don't have a complete message yet. Tell the pipe that we've evaluated up to the current buffer end, but cannot yet consume it.
36+
var (Length, IsEndOfMessage) = MessageFramePeek(buffer);
37+
//if (!TryReadInt24BigEndian(buffer, ref position, out int frame)) { pipereader.AdvanceTo(buffer.Start, buffer.End); continue; }
38+
//var (framelength, ismessageend) = MessageFrame(frame);
39+
if (buffer.Length < Length + MESSAGEFRAMESIZE) { pipereader.AdvanceTo(buffer.Start, buffer.End); continue; } //Don't have a complete message yet. Tell the pipe that we've evaluated up to the current buffer end, but cannot yet consume it.
3740

38-
Process(framelength, buffer.Slice(position));
39-
pipereader.AdvanceTo(buffer.GetPosition(framelength, position));
41+
Process(Length, buffer.Slice(position = buffer.GetPosition(MESSAGEFRAMESIZE, position), Length));
42+
pipereader.AdvanceTo(position = buffer.GetPosition(Length, position));
4043
//TODO - this should work now too!!! Need to evaluate if there is more than one packet in the pipe including edges like part of the length bytes are there but not all.
4144
}
4245
pipereader.Complete();
@@ -69,16 +72,19 @@ void Process(int framelength, ReadOnlySequence<byte> sequence)
6972
break;
7073
case Types.Request_Response:
7174
var requestresponse = new RequestResponse(header, ref reader);
75+
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(ref reader), requestresponse.ReadData(ref reader)); }
7276
break;
7377
case Types.Request_Fire_And_Forget:
7478
var requestfireandforget = new RequestFireAndForget(header, ref reader);
79+
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(ref reader), requestfireandforget.ReadData(ref reader)); }
7580
break;
7681
case Types.Request_Stream:
7782
var requeststream = new RequestStream(header, ref reader);
78-
if (requeststream.Validate()) { OnRequestStream(sink, requeststream); }
83+
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(ref reader), requeststream.ReadData(ref reader)); }
7984
break;
8085
case Types.Request_Channel:
8186
var requestchannel = new RequestChannel(header, ref reader);
87+
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(ref reader), requestchannel.ReadData(ref reader)); }
8288
break;
8389
case Types.Request_N:
8490
var requestne = new RequestN(header, ref reader);

RSocket.Core/RSocketProtocol.StateMachine.cs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,19 @@ void Process(int framelength, ReadOnlySequence<byte> sequence)
8585
break;
8686
case Types.Request_Response:
8787
var requestresponse = new RequestResponse(header, ref reader);
88+
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(ref reader), requestresponse.ReadData(ref reader)); }
8889
break;
8990
case Types.Request_Fire_And_Forget:
9091
var requestfireandforget = new RequestFireAndForget(header, ref reader);
92+
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(ref reader), requestfireandforget.ReadData(ref reader)); }
9193
break;
9294
case Types.Request_Stream:
9395
var requeststream = new RequestStream(header, ref reader);
94-
if (requeststream.Validate()) { OnRequestStream(sink, requeststream); }
96+
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(ref reader), requeststream.ReadData(ref reader)); }
9597
break;
9698
case Types.Request_Channel:
9799
var requestchannel = new RequestChannel(header, ref reader);
100+
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(ref reader), requestchannel.ReadData(ref reader)); }
98101
break;
99102
case Types.Request_N:
100103
var requestne = new RequestN(header, ref reader);
@@ -444,32 +447,32 @@ static public async Task Handler(IRSocketProtocol sink, PipeReader pipereader, C
444447
// else { throw new InvalidOperationException(); } //This is impossible, since we had at least one byte...
445448
//}
446449

447-
//static bool TryReadInt24BigEndian(ReadOnlySequence<byte> buffer, ref SequencePosition position, out Int32 value)
448-
//{
449-
// const int SIZEOF = 3;
450-
// buffer = buffer.Slice(position, SIZEOF);
451-
// if (buffer.TryGet(ref position, out var memory)) //TODO is this better as IsSingleSegment or unrolled loop or even read 4 bytes and if (BinaryPrimitives.TryReadInt32BigEndian(span, out var result) { value = result & 0xFFFFFF; }
452-
// {
453-
// System.Diagnostics.Debug.Assert(memory.Length == SIZEOF);
454-
// var span = memory.Span;
455-
// value = (span[0]) | (span[1] << 8) | (span[2] << 16);
456-
// }
457-
// else
458-
// {
459-
// if (buffer.Length < SIZEOF) { value = 0; return false; }
460-
// Int32 result = 0;
461-
// foreach (var subbuffer in buffer)
462-
// {
463-
// for (int index = 0; index < subbuffer.Length; index++)
464-
// {
465-
// result = (result << 8 | subbuffer.Span[index]);
466-
// }
467-
// }
468-
// value = result;
469-
// }
470-
// position = buffer.GetPosition(SIZEOF);
471-
// return true;
472-
//}
450+
static bool TryReadInt24BigEndian(ReadOnlySequence<byte> buffer, ref SequencePosition position, out Int32 value)
451+
{
452+
const int SIZEOF = 3;
453+
buffer = buffer.Slice(position, SIZEOF);
454+
if (buffer.TryGet(ref position, out var memory)) //TODO is this better as IsSingleSegment or unrolled loop or even read 4 bytes and if (BinaryPrimitives.TryReadInt32BigEndian(span, out var result) { value = result & 0xFFFFFF; }
455+
{
456+
System.Diagnostics.Debug.Assert(memory.Length == SIZEOF);
457+
var span = memory.Span;
458+
value = (span[0]) | (span[1] << 8) | (span[2] << 16);
459+
}
460+
else
461+
{
462+
if (buffer.Length < SIZEOF) { value = 0; return false; }
463+
Int32 result = 0;
464+
foreach (var subbuffer in buffer)
465+
{
466+
for (int index = 0; index < subbuffer.Length; index++)
467+
{
468+
result = (result << 8 | subbuffer.Span[index]);
469+
}
470+
}
471+
value = result;
472+
}
473+
position = buffer.GetPosition(SIZEOF);
474+
return true;
475+
}
473476

474477
static bool TryReadInt32BigEndian(ReadOnlySequence<byte> buffer, ref SequencePosition position, out Int32 value)
475478
{

0 commit comments

Comments
 (0)