Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Add support for unsubscribing from an IObservable<T> streaming result #885

@Tragetaschen

Description

@Tragetaschen

This is likely related to #481

I have ported my PersistentConnection from SignalR 2.2 to a Hub method returning an IObservable<T>. By desing, this observable doesn't complete throughout the lifetime of the application.

Whenever a client disconnects (I call connection.stop() in TS), the subscribed observer doesn't unsubscribe and all of them just pile up in my observable. SignalR just loses the subscription:
https://github.com/aspnet/SignalR/blob/rel/1.0.0-alpha1/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs#L39
I've quickly hacked an unsubscribe tied to the Hub's Dispose method:

public class MyHub : Hub
{
    private readonly MyObservable myObservable;

    private IDisposable disposeSubscription;

    public MyHub(MyObservable myObservable) => this.myObservable = myObservable;

    public IObservable<object> Data() => new wrappingObservable(myObservable, this);

    protected override void Dispose(bool disposing)
    {
        disposeSubscription?.Dispose();
        base.Dispose(disposing);
    }

    private class wrappingObservable : IObservable<object>
    {
        private readonly IObservable<object> original;
        private readonly MyHub myHub;

        public wrappingObservable(
            IObservable<object> original,
            MyHub myHub
        )
        {
            this.original = original;
            this.myHub = myHub;
        }

        public IDisposable Subscribe(IObserver<object> observer)
        {
            var disposable = original.Subscribe(observer);
            myHub.disposeSubscription = disposable;
            return disposable;
        }
    }
}

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions