Skip to content

Commit a64eeaf

Browse files
Updates for producer-side. Loopback mapping renamed. Unifying startup/shutdown on both client and server pipe interfaces. IAE<T> helpers for test or some consumers. Producer methods and wiring. Adding default for some protocol.data parameters - empty data is legal for many frames. Test updates (some failing, but interim commit).
1 parent 5bab1d7 commit a64eeaf

16 files changed

+445
-75
lines changed

RSocket.Core.Tests/ClientServerTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public void TestInitialize()
5656
{
5757
Loopback = new Transports.LoopbackTransport(DuplexPipe.ImmediateOptions, DuplexPipe.ImmediateOptions);
5858
Server = new TestServer(Loopback);
59-
Server.Start();
60-
_Client = new Lazy<RSocketClient>(() => new RSocketClient(Loopback).ConnectAsync().Result);
59+
Server.Connect();
60+
_Client = new Lazy<RSocketClient>(() => { var rsocket = new RSocketClient(Loopback); rsocket.ConnectAsync().Wait(); return rsocket; });
6161
}
6262
}
6363

RSocket.Core.Tests/ServerTests.cs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
using Microsoft.VisualStudio.TestTools.UnitTesting;
8+
using RSocket.Collections.Generic;
9+
using RSocket.Transports;
10+
11+
namespace RSocket.Tests
12+
{
13+
[TestClass]
14+
public class ServerTests
15+
{
16+
LoopbackTransport Loopback;
17+
RSocketClient Client;
18+
RSocketClient.ForStrings StringClient;
19+
RSocketServer Server;
20+
21+
22+
[TestMethod]
23+
public void ServerBasicTest()
24+
{
25+
Assert.AreNotEqual(Loopback.Input, Loopback.Beyond.Input, "Loopback Client/Server Inputs shouldn't be same.");
26+
Assert.AreNotEqual(Loopback.Output, Loopback.Beyond.Output, "Loopback Client/Server Outputs shouldn't be same.");
27+
}
28+
29+
[TestMethod]
30+
public async Task ServerRequestResponseTest()
31+
{
32+
Server.Responder = async request => { await Task.CompletedTask; return (request.Data, request.Metadata); };
33+
var response = await StringClient.RequestResponse("TEST DATA", "METADATA?_____");
34+
Assert.AreEqual("TEST DATA", response, "Response should round trip.");
35+
}
36+
37+
[TestMethod]
38+
public void ServerRequestStreamTest()
39+
{
40+
Server.Streamer =
41+
// IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Streamer
42+
((ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata) request) =>
43+
{
44+
return Yield().AsyncEnumerate();
45+
46+
IEnumerable<Task<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)>> Yield()
47+
{
48+
yield return Delay(TimeSpan.FromMilliseconds(10), Task.FromResult((request.Data, request.Metadata)));
49+
yield return Delay(TimeSpan.FromMilliseconds(10), Task.FromResult((request.Data, request.Metadata)));
50+
yield return Delay(TimeSpan.FromMilliseconds(10), Task.FromResult((request.Data, request.Metadata)));
51+
async Task<T> Delay<T>(TimeSpan delay, Task<T> yield) { await Task.Delay(delay); return await yield; }
52+
}
53+
};
54+
55+
//var astream = StringClient.RequestStream("TEST DATA", "METADATA?_____");
56+
57+
var list = new List<string>();
58+
var data = "TEST DATA";
59+
var metadata = "METADATA?_____";
60+
61+
var source = Client.RequestStream<string>(value => Encoding.UTF8.GetString(value.data.ToArray()),
62+
new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(data)),
63+
metadata == default ? default : new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(metadata)));
64+
var enumerator = source.GetAsyncEnumerator();
65+
try { while (enumerator.MoveNextAsync().Result) { list.Add(enumerator.Current); } }
66+
finally { enumerator.DisposeAsync().AsTask().Wait(); }
67+
68+
// var stream = astream.ToEnumerable().ToList();
69+
// Assert.AreEqual(3, stream.Count, "Stream contents missing.");
70+
}
71+
72+
[TestInitialize]
73+
public void TestInitialize()
74+
{
75+
Loopback = new Transports.LoopbackTransport(DuplexPipe.ImmediateOptions, DuplexPipe.ImmediateOptions);
76+
Client = new RSocketClient(Loopback);
77+
Server = new RSocketServer(Loopback.Beyond);
78+
Client.ConnectAsync().Wait();
79+
Server.ConnectAsync().Wait();
80+
StringClient = new RSocketClient.ForStrings(Client);
81+
//_Client = new Lazy<RSocketClient>(() => new RSocketClient(Loopback).ConnectAsync().Result);
82+
//RSocketClient Client => _Client.Value; Lazy<RSocketClient> _Client;
83+
//RSocketServer Server => _Server.Value; Lazy<RSocketServer> _Server;
84+
}
85+
}
86+
87+
88+
public class Sample
89+
{
90+
static Random random = new Random(1234);
91+
public int Id = random.Next(1000000);
92+
public string Name = nameof(Sample) + random.Next(10000).ToString();
93+
public DateTime Created = DateTime.Now;
94+
95+
public static implicit operator string(Sample value) => string.Join('|', value.Id, value.Name, value.Created);
96+
public static implicit operator ReadOnlySequence<byte>(Sample value) => new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(string.Join('|', value.Id, value.Name, value.Created)));
97+
public static implicit operator Sample(string value) { var values = value.Split('|'); return new Sample(values[0], values[1], values[2]); }
98+
public static implicit operator Sample(ReadOnlySequence<byte> value) => Encoding.UTF8.GetString(value.ToArray());
99+
100+
public Sample() { }
101+
public Sample(string id, string name, string created) { Id = int.Parse(id); Name = name; Created = DateTime.Parse(created); }
102+
public ReadOnlySequence<byte> Bytes => this;
103+
}
104+
}

RSocket.Core.Tests/TestServer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace RSocket.Tests
1313
{
14-
public class TestServer : RSocketServer
14+
public class TestServer : RSocket
1515
{
1616
public IRSocketStream Stream = new StreamReceiver();
1717
public List<Message> All = new List<Message>();
@@ -20,10 +20,10 @@ public class TestServer : RSocketServer
2020
public IEnumerable<Message.Setup> Setups => from message in Server.OfType<Message.Setup>() select message;
2121
public IEnumerable<Message.RequestStream> RequestStreams => from message in Server.OfType<Message.RequestStream>() select message;
2222

23-
public TestServer(IRSocketServerTransport transport) : base(transport) { }
23+
public TestServer(IRSocketTransport transport) : base(transport) { }
2424

25-
public override void Setup(in RSocketProtocol.Setup value) => All.Add(new Message.Setup(value));
26-
public override void RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => All.Add(new Message.RequestStream(message));
25+
//public override void Setup(in RSocketProtocol.Setup value) => All.Add(new Message.Setup(value));
26+
//public override void RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => All.Add(new Message.RequestStream(message));
2727

2828

2929
public class Message

RSocket.Core/IAsyncEnumerable.cs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#region Assembly System.Runtime, Version=4.2.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a
22
// .nuget\packages\microsoft.netcore.app\3.0.0-preview-27324-5\ref\netcoreapp3.0\System.Runtime.dll
33
#endregion
4-
54
using System.Threading;
65
using System.Threading.Tasks;
6+
using System.Collections.Generic;
77

88
namespace RSocket.Collections.Generic
99
{
@@ -22,4 +22,152 @@ public interface IAsyncDisposable
2222
{
2323
ValueTask DisposeAsync();
2424
}
25+
26+
27+
28+
static public class IAsyncEnumerableExtensions
29+
{
30+
static public IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source)
31+
{
32+
var enumerator = source.GetAsyncEnumerator();
33+
try { while (enumerator.MoveNextAsync().Result) { yield return enumerator.Current; } }
34+
finally { enumerator.DisposeAsync().AsTask().Wait(); }
35+
}
36+
37+
38+
static public IAsyncEnumerable<T> AsyncEnumerate<T>(this IEnumerable<Task<T>> source) => new AsyncEnumerable<T>(source);
39+
40+
private class AsyncEnumerable<T> : IAsyncEnumerable<T>
41+
{
42+
readonly IEnumerable<Task<T>> Enumerable;
43+
44+
public AsyncEnumerable(IEnumerable<Task<T>> enumerable) { Enumerable = enumerable; }
45+
46+
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) => new AsyncTaskEnumerator(Enumerable.GetEnumerator());
47+
48+
private class AsyncTaskEnumerator : IAsyncEnumerator<T>
49+
{
50+
readonly IEnumerator<Task<T>> Enumerator;
51+
public T Current { get; private set; }
52+
53+
public AsyncTaskEnumerator(IEnumerator<Task<T>> enumerator) { Enumerator = enumerator; }
54+
55+
public async ValueTask<bool> MoveNextAsync()
56+
{
57+
while (Enumerator.MoveNext())
58+
{
59+
Current = await Enumerator.Current;
60+
return true;
61+
}
62+
return false;
63+
}
64+
65+
public ValueTask DisposeAsync() { Enumerator.Dispose(); return new ValueTask(); }
66+
}
67+
68+
69+
70+
71+
72+
// //readonly Func<IRSocketStream, Task> Subscriber;
73+
// //readonly Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), T> Mapper;
74+
75+
// //public Receiver(Func<IRSocketStream, Task> subscriber, Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), T> mapper)
76+
// //{
77+
// // Subscriber = subscriber;
78+
// // Mapper = mapper;
79+
// //}
80+
81+
// public async Task<T> ExecuteAsync(CancellationToken cancellation = default)
82+
// {
83+
// var receiver = new Receiver();
84+
// await Subscriber(receiver);
85+
// var result = await receiver.Awaitable;
86+
// return Mapper((result.data, result.metadata));
87+
// }
88+
89+
// public async Task<T> ExecuteAsync(T result, CancellationToken cancellation = default)
90+
// {
91+
// var receiver = new Receiver();
92+
// await Subscriber(receiver);
93+
// return result;
94+
// }
95+
96+
// public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellation = default)
97+
// {
98+
// var enumerator = new MappedEnumerator(Mapper);
99+
// Subscriber(enumerator); //TODO Do we want to use this task too? It could fault. Also, cancellation. Nope, this should only await on the first MoveNext, so subscription is lower.
100+
// return enumerator;
101+
// }
102+
103+
// private class Enumerator : IAsyncEnumerator<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream
104+
// {
105+
// public bool IsCompleted { get; private set; } = false;
106+
// private (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) Value = default;
107+
// private ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)> Queue;
108+
// private AsyncManualResetEvent Continue = new AsyncManualResetEvent();
109+
// private Exception Error;
110+
111+
// public Enumerator()
112+
// {
113+
// Queue = new ConcurrentQueue<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>();
114+
// }
115+
116+
// public async ValueTask<bool> MoveNextAsync()
117+
// {
118+
// while (true)
119+
// {
120+
// if (Queue.TryDequeue(out Value)) { return true; }
121+
// await Continue.WaitAsync();
122+
// if (Error != default) { throw Error; }
123+
// else if (IsCompleted) { return false; }
124+
// else { Continue.Reset(); }
125+
// }
126+
// }
127+
128+
// public (ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) Current => Value;
129+
130+
// public ValueTask DisposeAsync()
131+
// {
132+
// return new ValueTask();
133+
// }
134+
135+
// public void OnCompleted() { IsCompleted = true; ; Continue.Set(); }
136+
// public void OnError(Exception error) { Error = error; Continue.Set(); }
137+
// public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
138+
// {
139+
// //TODO Would we really need to interlock this? If the Queue isn't allocated, it's the first time through...?
140+
// //var value = Interlocked.Exchange<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(ref Value, default); //TODO Hmmm, no ValueTuples... Could save Queue allocation if only going to get one...
141+
// Queue.Enqueue(value);
142+
// Continue.Set();
143+
// }
144+
145+
// class AsyncManualResetEvent //Steven Toub: https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
146+
// {
147+
// private volatile TaskCompletionSource<bool> Completion = new TaskCompletionSource<bool>();
148+
// public Task WaitAsync() => Completion.Task;
149+
// public void Set() { Completion.TrySetResult(true); }
150+
// public void Reset() { while (true) { var previous = Completion; if (!previous.Task.IsCompleted || Interlocked.CompareExchange(ref Completion, new TaskCompletionSource<bool>(), previous) == previous) { return; } } }
151+
// }
152+
// }
153+
154+
// private class MappedEnumerator : Enumerator, IAsyncEnumerator<T>, IRSocketStream
155+
// {
156+
// readonly Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> Mapper;
157+
158+
// public MappedEnumerator(Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> mapper)
159+
// {
160+
// Mapper = mapper;
161+
// }
162+
163+
// public new T Current => Mapper(base.Current);
164+
// //ReadOnlySequence<byte> IAsyncEnumerator<ReadOnlySequence<byte>>.Current => Value.data;
165+
166+
// public new ValueTask DisposeAsync()
167+
// {
168+
// return base.DisposeAsync();
169+
// }
170+
// }
171+
}
172+
}
25173
}

RSocket.Core/IRSocketTransports.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ public interface IRSocketTransport
1313
PipeReader Input { get; }
1414
PipeWriter Output { get; }
1515

16-
Task ConnectAsync(CancellationToken cancel = default);
16+
Task StartAsync(CancellationToken cancel = default);
17+
Task StopAsync();
1718
}
1819

1920
/// <summary>
@@ -24,7 +25,7 @@ public interface IRSocketServerTransport
2425
PipeReader Input { get; }
2526
PipeWriter Output { get; }
2627

27-
Task StartAsync();
28+
Task StartAsync(CancellationToken cancel = default);
2829
Task StopAsync();
2930
}
3031
}

RSocket.Core/RSocket.Receiver.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ partial class RSocket
1515
public class Receiver<T> : IAsyncEnumerable<T>
1616
{
1717
readonly Func<IRSocketStream, Task> Subscriber;
18-
readonly Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> Mapper;
18+
readonly Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), T> Mapper;
1919

20-
public Receiver(Func<IRSocketStream, Task> subscriber, Func<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data), T> mapper)
20+
public Receiver(Func<IRSocketStream, Task> subscriber, Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), T> mapper)
2121
{
2222
Subscriber = subscriber;
2323
Mapper = mapper;
@@ -27,7 +27,8 @@ public async Task<T> ExecuteAsync(CancellationToken cancellation = default)
2727
{
2828
var receiver = new Receiver();
2929
await Subscriber(receiver);
30-
return Mapper(await receiver.Awaitable);
30+
var result = await receiver.Awaitable;
31+
return Mapper((result.data, result.metadata));
3132
}
3233

3334
public async Task<T> ExecuteAsync(T result, CancellationToken cancellation = default)

0 commit comments

Comments
 (0)