Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@ public class AsyncEnumerableTests
{
protected static readonly IAsyncEnumerable<int> Return42 = new[] { 42 }.ToAsyncEnumerable();

protected async Task AssertThrowsAsync<TException>(Task t)
protected async Task<TException> AssertThrowsAsync<TException>(Task t)
where TException : Exception
{
await Assert.ThrowsAsync<TException>(() => t);
return await Assert.ThrowsAsync<TException>(() => t);
}

protected async Task AssertThrowsAsync(Task t, Exception e)
{
try
{
await t;
}
catch (Exception ex)
{
Assert.Same(e, ex);
}
var ex = await Assert.ThrowsAnyAsync<Exception>(() => t);
Assert.Same(e, ex);
}

protected Task AssertThrowsAsync<T>(ValueTask<T> t, Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -283,6 +284,48 @@ public async Task ToAsyncEnumerable_Observable_Throw()
await AssertThrowsAsync(e.MoveNextAsync(), ex);
}

[Fact]
public async Task ToAsyncEnumerable_Observable_Throw_ActiveException()
{
// No inlining as we're asserting that this function must be mentioned in the exception stacktrace
[MethodImpl(MethodImplOptions.NoInlining)]
void ThrowsException()
{
throw new Exception("Bang!");
}

var subscribed = false;

var xs = new MyObservable<int>(obs =>
{
subscribed = true;

try
{
ThrowsException();
}
catch (Exception ex)
{
// `ex` is active at this point as it has been thrown.
// Therefore the stack trace should be captured by ToAsyncEnumerable.
// See 'Remarks' at https://docs.microsoft.com/en-us/dotnet/api/system.runtime.exceptionservices.exceptiondispatchinfo.capture
obs.OnError(ex);
}

return new MyDisposable(() => { });
}).ToAsyncEnumerable();

Assert.False(subscribed);

var e = xs.GetAsyncEnumerator();

// NB: Breaking change to align with lazy nature of async iterators.
// Assert.True(subscribed);

var actual = await AssertThrowsAsync<Exception>(e.MoveNextAsync().AsTask());
Assert.Contains(nameof(ThrowsException), actual.StackTrace);
}

[Fact]
public async Task ToAsyncEnumerable_Observable_Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -31,7 +32,7 @@ private sealed class ObservableAsyncEnumerable<TSource> : AsyncIterator<TSource>
private readonly IObservable<TSource> _source;

private ConcurrentQueue<TSource>? _values = new ConcurrentQueue<TSource>();
private Exception? _error;
private ExceptionDispatchInfo? _error;
private bool _completed;
private TaskCompletionSource<bool>? _signal;
private IDisposable? _subscription;
Expand Down Expand Up @@ -95,7 +96,7 @@ protected override async ValueTask<bool> MoveNextCore()

if (error != null)
{
throw error;
error.Throw();
}

return false;
Expand All @@ -120,7 +121,7 @@ public void OnCompleted()

public void OnError(Exception error)
{
_error = error;
_error = ExceptionDispatchInfo.Capture(error);
Volatile.Write(ref _completed, true);

DisposeSubscription();
Expand Down