Skip to content

Commit fc777f3

Browse files
Cherry-picked from 0c92aac (#6048)
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
1 parent 7293c6e commit fc777f3

File tree

2 files changed

+182
-55
lines changed

2 files changed

+182
-55
lines changed

src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs

+116-33
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
using System.Threading;
1111
using System.Threading.Tasks;
1212
using Akka.Pattern;
13-
using Akka.Routing;
1413
using Akka.Streams.Dsl;
1514
using Akka.Streams.TestKit;
1615
using Akka.TestKit;
1716
using FluentAssertions;
18-
using Nito.AsyncEx.Synchronous;
1917
using Xunit;
2018
using Xunit.Abstractions;
2119
using System.Collections.Generic;
@@ -24,6 +22,9 @@
2422
using Akka.Streams.TestKit.Tests;
2523
using Akka.Streams.Tests.Actor;
2624
using Reactive.Streams;
25+
using System.Runtime.CompilerServices;
26+
using Akka.Util;
27+
using FluentAssertions.Extensions;
2728

2829
namespace Akka.Streams.Tests.Dsl
2930
{
@@ -33,16 +34,16 @@ public class AsyncEnumerableSpec : AkkaSpec
3334
private ActorMaterializer Materializer { get; }
3435
private ITestOutputHelper _helper;
3536
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
36-
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
37-
helper)
37+
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
38+
helper)
3839
{
3940
_helper = helper;
4041
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
4142
Materializer = ActorMaterializer.Create(Sys, settings);
4243
}
4344

4445

45-
[Fact]
46+
[Fact]
4647
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
4748
{
4849
var input = Enumerable.Range(1, 6).ToList();
@@ -67,7 +68,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
6768

6869
caught.ShouldBeTrue();
6970
}
70-
71+
7172
[Fact]
7273
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
7374
{
@@ -79,7 +80,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
7980
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
8081
output = output.Skip(1).ToArray();
8182
}
82-
output.Length.ShouldBe(0,"Did not receive all elements!");
83+
output.Length.ShouldBe(0, "Did not receive all elements!");
8384
}
8485

8586
[Fact]
@@ -93,15 +94,15 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
9394
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
9495
output = output.Skip(1).ToArray();
9596
}
96-
output.Length.ShouldBe(0,"Did not receive all elements!");
97+
output.Length.ShouldBe(0, "Did not receive all elements!");
9798

9899
output = input.ToArray();
99100
await foreach (var a in asyncEnumerable)
100101
{
101102
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
102103
output = output.Skip(1).ToArray();
103104
}
104-
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
105+
output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
105106
}
106107

107108

@@ -112,7 +113,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
112113
var probe = this.CreatePublisherProbe<int>();
113114
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
114115

115-
var a = Task.Run( async () =>
116+
var a = Task.Run(async () =>
116117
{
117118
await foreach (var notused in task)
118119
{
@@ -123,19 +124,20 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
123124
//we want to send messages so we aren't just waiting forever.
124125
probe.SendNext(1);
125126
probe.SendNext(2);
126-
bool thrown = false;
127+
var thrown = false;
127128
try
128129
{
129130
await a;
130131
}
131-
catch (StreamDetachedException e)
132-
{
133-
thrown = true;
134-
}
132+
catch (StreamDetachedException e)
133+
{
134+
thrown = true;
135+
}
135136
catch (AbruptTerminationException e)
136137
{
137138
thrown = true;
138139
}
140+
139141
thrown.ShouldBeTrue();
140142
}
141143

@@ -151,47 +153,128 @@ async Task ShouldThrow()
151153
{
152154
await foreach (var a in task)
153155
{
154-
155156
}
156157
}
157158

158159
await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
159160
}
160161

161-
[Fact]
162-
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
162+
[Fact]
163+
public void
164+
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
163165
{
164-
Func<IAsyncEnumerable<int>> range = () =>
165-
{
166-
return RangeAsync(1, 100);
167-
};
166+
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
168167
var subscriber = this.CreateManualSubscriberProbe<int>();
169168

170-
Source.From(range)
169+
Source.From(Range)
171170
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
172171

173172
var subscription = subscriber.ExpectSubscription();
174173
subscription.Request(100);
175-
for (int i = 1; i <= 20; i++)
174+
subscriber.ExpectComplete();
175+
}
176+
177+
[Fact]
178+
public void AsyncEnumerableSource_Must_Process_All_Elements()
179+
{
180+
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
181+
var subscriber = this.CreateManualSubscriberProbe<int>();
182+
183+
Source.From(Range)
184+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
185+
186+
var subscription = subscriber.ExpectSubscription();
187+
subscription.Request(101);
188+
189+
subscriber.ExpectNextN(Enumerable.Range(0, 100));
190+
191+
subscriber.ExpectComplete();
192+
}
193+
194+
[Fact]
195+
public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
196+
{
197+
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
198+
var subscriber = this.CreateManualSubscriberProbe<int>();
199+
200+
Source.From(Range)
201+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
202+
203+
var subscription = subscriber.ExpectSubscription();
204+
subscription.Request(101);
205+
206+
subscriber.ExpectNextN(Enumerable.Range(0, 50));
207+
208+
var exception = subscriber.ExpectError();
209+
210+
// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
211+
exception.Should().BeOfType<TestException>();
212+
exception.Message.Should().Be("BOOM!");
213+
}
214+
215+
[Fact]
216+
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
217+
{
218+
var latch = new AtomicBoolean();
219+
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
220+
var subscriber = this.CreateManualSubscriberProbe<int>();
221+
222+
Source.From(Range)
223+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
224+
225+
var subscription = subscriber.ExpectSubscription();
226+
subscription.Request(50);
227+
subscriber.ExpectNextN(Enumerable.Range(0, 50));
228+
subscription.Cancel();
229+
230+
// The cancellation token inside the IAsyncEnumerable should be cancelled
231+
await WithinAsync(3.Seconds(), async () => latch.Value);
232+
}
233+
234+
private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
235+
[EnumeratorCancellation] CancellationToken token = default)
236+
{
237+
foreach (var i in Enumerable.Range(start, count))
176238
{
177-
var next = subscriber.ExpectNext(i);
178-
_helper.WriteLine(i.ToString());
239+
await Task.Delay(10, token);
240+
if(token.IsCancellationRequested)
241+
yield break;
242+
yield return i;
179243
}
244+
}
180245

181-
//subscriber.ExpectComplete();
246+
private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
247+
[EnumeratorCancellation] CancellationToken token = default)
248+
{
249+
foreach (var i in Enumerable.Range(start, count))
250+
{
251+
if(token.IsCancellationRequested)
252+
yield break;
253+
254+
if (i == throwAt)
255+
throw new TestException("BOOM!");
256+
257+
yield return i;
258+
}
182259
}
183260

184-
static async IAsyncEnumerable<int> RangeAsync(int start, int count)
261+
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
262+
[EnumeratorCancellation] CancellationToken token = default)
185263
{
186-
for (var i = 0; i < count; i++)
264+
token.Register(() =>
265+
{
266+
latch.GetAndSet(true);
267+
});
268+
foreach (var i in Enumerable.Range(start, count))
187269
{
188-
await Task.Delay(i);
189-
yield return start + i;
270+
if(token.IsCancellationRequested)
271+
yield break;
272+
273+
yield return i;
190274
}
191275
}
192276

193277
}
194278
#else
195279
#endif
196-
197-
}
280+
}

0 commit comments

Comments
 (0)