Skip to content

Distributed Collections #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 50 commits into from
Jun 17, 2025
Merged

Distributed Collections #43

merged 50 commits into from
Jun 17, 2025

Conversation

DJGosnell
Copy link
Member

@DJGosnell DJGosnell commented May 23, 2025

This PR introduces a foundational abstraction for remotely synchronized collections and a concrete list interface that builds on it. At its core, INexusCollection captures the lifecycle of a server-backed collection: Once connected, clients can subscribe to the Changed event (an ISubscriptionEvent) to receive notifications whenever the server-side data mutates, and unsubscribe by disposing the returned token.

INexusList inherits from INexusCollection and IEnumerable, offering a list-like API but with each mutation routed to the server. Methods like AddAsync, InsertAsync, RemoveAsync, RemoveAtAsync, ClearAsync, MoveAsync, and ReplaceAsync all return Task, where true indicates the server accepted and applied the operation and false signals an invalid index, non-existent item, or a no-op. This pattern allows client code to react appropriately—retry, log, or ignore—depending on whether the remote operation succeeded. Read-only operations remain synchronous: you can call Contains, IndexOf, retrieve Count, check IsReadOnly (which is always false in server-to-client mode), use the indexer this[int index], or copy contents into a provided array via CopyTo, with the usual argument validation exceptions. All data accessed in the synchronous methods access the current state on the client and may not reflect the latest state on the server if accessed between the server broadcast update and the client side read.

The Changed subscription event abstraction is factored into ISubscriptionEvent offering a Subscribe method that accepts a handler and returns an IDisposable for unsubscription.

Under the hood, this architecture extracts connection handling and event subscription so that other collection types (e.g., sets or maps) can plug into the same infrastructure and implement their own mutation operations according to their specific semantics.

Details

  • Adds new IAsyncEnumerable to channel readers. This significantly eases reading of channels. This is now the refereed method to read channels.
  • Adds base functionality for additional synchronized collections for more types. (Dictionary).
  • Updated NexusGenerator for collection integration generation and additional diagnostics to catch common mistakes and misconfigurations.
  • Added extensive integration and generator tests.
  • Added ISubscriptionEvent to allow for ease of subscribing and unsubscribing from events.
  • Added several customized collections for usage.
  • Extracted ISessionContext from SessionContext.

New Public API

public interface INexusList<T> : INexusCollection, IEnumerable<T>{
    int Count { get; }
    bool IsReadOnly { get; }
    bool Contains(T item);
    void CopyTo(T[] array, int arrayIndex);
    int IndexOf(T item);
    Task<bool> ClearAsync();
    Task<bool> InsertAsync(int index, T item);
    Task<bool> RemoveAtAsync(int index);
    Task<bool> RemoveAsync(T item);
    Task<bool> AddAsync(T item);
    Task<bool> MoveAsync(int fromIndex, int toIndex);
    Task<bool> ReplaceAsync(int index, T value);
    T this[int index] { get; }
}

public interface INexusCollection : IEnumerable
{    
    public NexusCollectionState State { get; }
    public Task DisconnectedTask { get; }
    public Task<bool> ConnectAsync(CancellationToken token = default);
    public Task  DisconnectAsync();
    public ISubscriptionEvent<NexusCollectionChangedEventArgs> Changed { get; }
}

public class NexusCollectionAttribute : Attribute
{
    public ushort Id { get; init; } = 0;
    public NexusCollectionMode Mode { get; init; } = NexusCollectionMode.ServerToClient;
    public bool Ignore { get; init; }
}

public enum NexusCollectionMode
{
    Unset,
    ServerToClient,
    BiDrirectional
}

The *Async methods all execute the changes on the server and wait for the server confirmation of the change operation. If the Task returns false, the value was not updated on the server. This may indicate that the list was cleared or the operation was rendered a noop.

Synchronized List Usage

The list must be decorated with the NexusCollectionAttribute, otherwise the Server nexus it will throw. Synchronized collections are only allowed on the server nexus. Collections are to be configured as either BiDrirectional or ServerToClient.

public partial interface IClientNexus { }
public partial interface IServerNexus
{
    [NexusCollection(NexusCollectionMode.BiDrirectional)]
    INexusList<int> IntList { get; }
}

[Nexus<IClientNexus, IServerNexus>(NexusType = NexusType.Client)]
public partial class ClientNexus { }

[Nexus<IServerNexus, IClientNexus>(NexusType = NexusType.Server)]
public partial class ServerNexus { }

await ServerNexus.CreateServer(..., ...).StartAsync();

var client = ClientNexus.CreateClient(..., ...);
await client.ConnectAsync();

// Connect to the list (internally starts a duplex pipe for communication).
await client.Proxy.IntList.ConnectAsync();

// Add the value to the synchronized collection. This will return upon acknowledgement by the server of addition.
await client.Proxy.IntList.AddAsync(12345);

IAsyncEnumerable Usage

The preferred method of reading channels is now using the IAsyncEnumerable on the provided INexusChannelReader. This allows for the most efficient buffering of data while reading and simplifies channel closure whether graceful or not.

var writer = await pipe.GetUnmanagedChannelWriter<int>();
int counter = 0;
await foreach (var msg in await pipe.GetChannelReader<ComplexMessage>())
{
    // Do something with the message.
}

Breaking Changes

  • ICollectionConfigurer is now a required interface of Nexuses.
  • Properties are no longer allowed on the Nexus classes other than for synchronized collections.

Misc

  • Added ConfigureAwaitChecker.Analyzer package for automatic checking of ConfigureAwait(false) where required.
  • Fixed several places that were missing ConfigureAwait.

DJGosnell and others added 30 commits May 19, 2025 17:19
Added tests.
…iteration on multiple threads.

Started client work.
Added Clear to Fizzing operations.
Client connects to server collection now.
Continued work on messaging.
… Task.Run, but with a state passed.

Initial channel broadcast working.  Items are being sent out of order after buffer has been filled on the server.
Started work on Generator.
Processed all diagnostics instead of until one failed.
Changed errors to display on the interface if it is in the same assembly or the nexus implementation if not.
Started extracting common logic into NexusCollection class.
Further extraction of common logic.
Added SnapshotTests.
Added CircularListTests.
@DJGosnell DJGosnell merged commit 29f8c95 into master Jun 17, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant