Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/revamp #105

Open
wants to merge 53 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
51ae923
taking shape
eduard-dumitru Jun 11, 2024
ee679a0
temporarily disable branch check for CI nuget push
eduard-dumitru Jun 11, 2024
f1927a2
Fix/namedpipe close (#97)
danutboanta Feb 2, 2024
047d405
cherry pick fix to Spurious Closed Pipes (ROBO-3083)
eduard-dumitru Jun 12, 2024
753be9c
rename assembly and package
eduard-dumitru Jun 12, 2024
a25cd01
fixes
eduard-dumitru Jun 12, 2024
22df014
decommission object params (#103)
eduard-dumitru Jun 26, 2024
2601f27
decommission cryptography
eduard-dumitru Jun 27, 2024
0959a95
change namespace from UiPath.CoreIpc to UiPath.Ipc
eduard-dumitru Jun 27, 2024
71cc234
fixes after namespace change
eduard-dumitru Jun 27, 2024
ab26f0c
fix tests
eduard-dumitru Jun 27, 2024
b53aeb0
reduce public api 1st round
eduard-dumitru Jun 27, 2024
8897b05
downgrade Microsoft.Extensions.DependencyInjection.Abstractions back …
eduard-dumitru Jul 1, 2024
a044fc5
revamp configuration and callback management
eduard-dumitru Jul 4, 2024
da0d120
static all the things almost
eduard-dumitru Jul 4, 2024
84f780e
taking shape
eduard-dumitru Jul 5, 2024
7158d57
extensibility
eduard-dumitru Jul 10, 2024
1df069a
after quick review
eduard-dumitru Jul 15, 2024
45e3370
simplify IpcServer config and decommission server's Callbacks setting
eduard-dumitru Jul 17, 2024
9f1a386
taking shape
eduard-dumitru Jul 17, 2024
b6930af
taking shape
eduard-dumitru Jul 24, 2024
481ef4e
taking shape
eduard-dumitru Jul 25, 2024
c87d9bf
working state
eduard-dumitru Jul 26, 2024
8f7f5be
tests taking shape
eduard-dumitru Jul 26, 2024
46ce612
fixes and test reconciliation
eduard-dumitru Jul 27, 2024
a491e3a
fix js NodeInterop
eduard-dumitru Jul 27, 2024
be30f9c
taking shape
eduard-dumitru Jul 29, 2024
4b29ac9
GetProxy simplification
eduard-dumitru Jul 29, 2024
5fdca6d
allow BeforeCall to set AsyncLocal for the targeted method itself
eduard-dumitru Jul 29, 2024
321dbe0
adjust public members
eduard-dumitru Jul 31, 2024
2b5b90a
system tests
eduard-dumitru Aug 4, 2024
7bfd50f
fix test brittleness
eduard-dumitru Aug 6, 2024
3506106
tests working
eduard-dumitru Aug 6, 2024
1cfc3c3
decomission Pollyfill nuget and replace it with manual polyfills
eduard-dumitru Aug 6, 2024
decb270
decommission Nito.AsyncEx
eduard-dumitru Aug 6, 2024
521e536
fix bug
eduard-dumitru Aug 7, 2024
c6c737c
decomission old tests
eduard-dumitru Aug 7, 2024
e966d75
fixes, tweaks and enhancements
eduard-dumitru Aug 7, 2024
0135fa5
integrate js nodeinterop
eduard-dumitru Aug 7, 2024
bc60228
fix nuget.config and move benchmarks
eduard-dumitru Aug 7, 2024
4e24f36
tweak test timeouts
eduard-dumitru Aug 7, 2024
3f9902d
fix nodeinterop integration
eduard-dumitru Aug 7, 2024
558f282
benchmark fixes
eduard-dumitru Aug 7, 2024
cc84e0f
nodeinterop fixes
eduard-dumitru Aug 8, 2024
ef34081
expand router to all inherited interfaces with last come wins semantics
eduard-dumitru Aug 8, 2024
e93e8ed
namedpipe impersonation fix
eduard-dumitru Aug 12, 2024
b5ce48c
fix: no beforecall for incomming callbacks
eduard-dumitru Aug 13, 2024
28c6d3c
simplify Ipc client api
eduard-dumitru Aug 15, 2024
b1dc8ab
reintroduce ConnectionFactory as BeforeConnect
eduard-dumitru Aug 16, 2024
6dd4d9a
commission waitforstart
eduard-dumitru Aug 26, 2024
2f07b37
decommission IAsyncStream and the Network union
eduard-dumitru Aug 28, 2024
90aff05
update public API report
eduard-dumitru Aug 28, 2024
1811e00
fixes and simplification
eduard-dumitru Aug 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
working state
  • Loading branch information
eduard-dumitru committed Jul 26, 2024
commit c87d9bf59bf084d0503f744b93124de6bc5a0c57
6 changes: 3 additions & 3 deletions src/Playground/Playground.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions src/Playground/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Playground;
using UiPath.CoreIpc.Http;
using UiPath.Ipc;
using UiPath.Ipc.Transport.NamedPipe;

Expand Down Expand Up @@ -42,8 +41,7 @@ private static async Task Main(string[] args)
Endpoints = new()
{
typeof(Contracts.IServerOperations),
typeof(Contracts.IClientOperations2),
{ typeof(Contracts.IClientOperations2), new object() }
typeof(Contracts.IClientOperations2)
},
Listeners = [
new NamedPipeListener()
Expand Down Expand Up @@ -88,7 +86,9 @@ private static async Task Main(string[] args)
{ typeof(Contracts.IClientOperations2), new Impl.Client2() }
},
Scheduler = clientScheduler
}.GetProxy<Contracts.IServerOperations>();
}
.GetProxyFactory()
.GetProxy<Contracts.IServerOperations>();

await proxy1.Register();
await proxy1.Broadcast("Hello Bidirectional Http!");
Expand Down
258 changes: 258 additions & 0 deletions src/UiPath.CoreIpc.Http/BidiHttpListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
using Nito.AsyncEx;
using System.Buffers;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Net;
using System.Threading.Channels;
using UiPath.Ipc;

namespace UiPath.CoreIpc.Http;

using static Constants;
using IBidiHttpListenerConfig = IListenerConfig<BidiHttpListener, BidiHttpListenerState, BidiHttpServerConnectionState>;

public sealed partial record BidiHttpListener : ListenerConfig, IBidiHttpListenerConfig
{
public required Uri Uri { get; init; }

BidiHttpListenerState IBidiHttpListenerConfig.CreateListenerState(IpcServer server)
=> new(server, this);

BidiHttpServerConnectionState IBidiHttpListenerConfig.CreateConnectionState(IpcServer server, BidiHttpListenerState listenerState)
=> new(server, listenerState);

async ValueTask<Network> IBidiHttpListenerConfig.AwaitConnection(BidiHttpListenerState listenerState, BidiHttpServerConnectionState connectionState, CancellationToken ct)
{
await connectionState.WaitForConnection(ct);
return connectionState;
}

public IEnumerable<string> Validate()
{
throw new NotImplementedException();
}
}

internal sealed class BidiHttpListenerState : IAsyncDisposable
{
private readonly IpcServer _ipcServer;
private readonly CancellationTokenSource _cts = new();
private readonly HttpListener _httpListener;
private readonly Task _processing;
private readonly Lazy<Task> _disposing;

private readonly ConcurrentDictionary<Guid, Channel<HttpListenerContext>> _connections = new();
private readonly Channel<(Guid connectionId, Uri reverseUri)> _newConnections = Channel.CreateUnbounded<(Guid connectionId, Uri reverseUri)>();

public ChannelReader<(Guid connectionId, Uri reverseUri)> NewConnections => _newConnections.Reader;
public ChannelReader<HttpListenerContext> GetConnectionChannel(Guid connectionId) => _connections[connectionId];

public BidiHttpListenerState(IpcServer ipcServer, BidiHttpListener listener)
{
_ipcServer = ipcServer;
_httpListener = new HttpListener()
{
Prefixes =
{
listener.Uri.ToString()
}
};
_processing = ProcessContexts();
_disposing = new(DisposeCore);
}

public ValueTask DisposeAsync() => new(_disposing.Value);

private async Task DisposeCore()
{
_cts.Cancel();
try
{
await _processing;
}
catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token)
{
}

foreach (var pair in _connections)
{
pair.Value.Writer.Complete();
}
_cts.Dispose();
}

private async Task ProcessContexts()
{
await foreach (var (context, connectionId, reverseUri) in AwaitContexts())
{
var connectionChannel = _connections.GetOrAdd(connectionId, _ =>
{
_newConnections.Writer.TryWrite((connectionId, reverseUri));
return Channel.CreateUnbounded<HttpListenerContext>();
});

await connectionChannel.Writer.WriteAsync(context, _cts.Token);
}

async IAsyncEnumerable<(HttpListenerContext context, Guid connectionId, Uri reverseUri)> AwaitContexts()
{
while (!_cts.Token.IsCancellationRequested)
{
var context = await _httpListener.GetContextAsync();

if (!TryAcceptContext(context, out var connectionId, out var reverseUri))
{
context.Response.StatusCode = 400;
context.Response.Close();
continue;
}

yield return (context, connectionId, reverseUri);
}
}

bool TryAcceptContext(HttpListenerContext context, out Guid connectionId, [NotNullWhen(returnValue: true)] out Uri? reverseUri)
{
if (!Guid.TryParse(context.Request.Headers[ConnectionIdHeader], out connectionId) ||
!Uri.TryCreate(context.Request.Headers[ReverseUriHeader], UriKind.Absolute, out reverseUri))
{
connectionId = Guid.Empty;
reverseUri = null;
return false;
}

return true;
}
}
}

internal sealed class BidiHttpServerConnectionState : IAsyncDisposable, IAsyncStream
{
private readonly Pipe _pipe = new();

private readonly IpcServer _server;
private readonly BidiHttpListenerState _listenerState;

private readonly CancellationTokenSource _cts = new();
private readonly AsyncLock _lock = new();
private (Guid connectionId, Uri reverseUri)? _connection = null;
private HttpClient? _client;
private Task? _processing = null;
private readonly Lazy<Task> _disposing;

public BidiHttpServerConnectionState(IpcServer server, BidiHttpListenerState listenerState)
{
_server = server;
_listenerState = listenerState;
_disposing = new(DisposeCore);
}

public ValueTask DisposeAsync() => new(_disposing.Value);

private async Task DisposeCore()
{
_cts.Cancel();

_client?.Dispose();

try
{
await (_processing ?? Task.CompletedTask);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token)
{
// ignored
}

_cts.Dispose();
}

public async Task WaitForConnection(CancellationToken ct)
{
using (await _lock.LockAsync(ct))
{
if (_connection is not null)
{
throw new InvalidOperationException();
}

_connection = await _listenerState.NewConnections.ReadAsync(ct);

_client = new()
{
BaseAddress = _connection.Value.reverseUri,
DefaultRequestHeaders =
{
{ ConnectionIdHeader, _connection.Value.connectionId.ToString() }
}
};

_processing = ProcessContexts(_cts.Token);
}
}

private async Task ProcessContexts(CancellationToken ct)
{
var reader = _listenerState.GetConnectionChannel(_connection!.Value.connectionId);

await foreach (var context in reader.ReadAllAsync(ct))
{
await ProcessContext(context);
}

async Task ProcessContext(HttpListenerContext context)
{
try
{
while (true)
{
var memory = _pipe.Writer.GetMemory();
var cbRead = await context.Request.InputStream.ReadAsync(memory, ct);
if (cbRead is 0)
{
break;
}
_pipe.Writer.Advance(cbRead);
var flushResult = await _pipe.Writer.FlushAsync(ct);
if (flushResult.IsCompleted)
{
break;
}
}
}
finally
{
context.Response.StatusCode = 200;
context.Response.Close();
}
}
}

async ValueTask<int> IAsyncStream.Read(Memory<byte> memory, CancellationToken ct)
{
var readResult = await _pipe.Reader.ReadAsync(ct);

var take = (int)Math.Min(readResult.Buffer.Length, memory.Length);

readResult.Buffer.Slice(start: 0, length: take).CopyTo(memory.Span);
_pipe.Reader.AdvanceTo(readResult.Buffer.GetPosition(take));

return take;
}

async ValueTask IAsyncStream.Write(ReadOnlyMemory<byte> memory, CancellationToken ct)
{
if (_client is null)
{
throw new InvalidOperationException();
}

await _client.PostAsync(
requestUri: "",
new ReadOnlyMemoryContent(memory),
ct);
}

ValueTask IAsyncStream.Flush(CancellationToken ct) => default;
}
Loading