Skip to content

Commit 9ae7e8b

Browse files
authored
add filtering to non-subscription reads of $all (#278)
1 parent def981a commit 9ae7e8b

File tree

6 files changed

+93
-27
lines changed

6 files changed

+93
-27
lines changed

src/EventStore.Client.Streams/EventStoreClient.Read.cs

+59-11
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
41
using System.Runtime.CompilerServices;
5-
using System.Threading;
62
using System.Threading.Channels;
7-
using System.Threading.Tasks;
83
using EventStore.Client.Streams;
94
using Grpc.Core;
105
using static EventStore.Client.Streams.ReadResp;
@@ -42,23 +37,76 @@ public ReadAllStreamResult ReadAllAsync(
4237
Options = new() {
4338
ReadDirection = direction switch {
4439
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
45-
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
46-
_ => throw InvalidOption(direction)
40+
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
41+
_ => throw InvalidOption(direction)
4742
},
4843
ResolveLinks = resolveLinkTos,
4944
All = new() {
5045
Position = new() {
51-
CommitPosition = position.CommitPosition,
46+
CommitPosition = position.CommitPosition,
5247
PreparePosition = position.PreparePosition
5348
}
5449
},
55-
Count = (ulong)maxCount,
56-
UuidOption = new() {Structured = new()},
57-
NoFilter = new(),
50+
Count = (ulong)maxCount,
51+
UuidOption = new() {Structured = new()},
52+
NoFilter = new(),
5853
ControlOption = new() {Compatibility = 1}
5954
}
6055
}, Settings, deadline, userCredentials, cancellationToken);
6156
}
57+
58+
/// <summary>
59+
/// Asynchronously reads all events with filtering.
60+
/// </summary>
61+
/// <param name="direction">The <see cref="Direction"/> in which to read.</param>
62+
/// <param name="position">The <see cref="Position"/> to start reading from.</param>
63+
/// <param name="eventFilter">The <see cref="IEventFilter"/> to apply.</param>
64+
/// <param name="maxCount">The maximum count to read.</param>
65+
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
66+
/// <param name="deadline"></param>
67+
/// <param name="userCredentials">The optional <see cref="UserCredentials"/> to perform operation with.</param>
68+
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
69+
/// <returns></returns>
70+
public ReadAllStreamResult ReadAllAsync(
71+
Direction direction,
72+
Position position,
73+
IEventFilter eventFilter,
74+
long maxCount = long.MaxValue,
75+
bool resolveLinkTos = false,
76+
TimeSpan? deadline = null,
77+
UserCredentials? userCredentials = null,
78+
CancellationToken cancellationToken = default
79+
) {
80+
if (maxCount <= 0) {
81+
throw new ArgumentOutOfRangeException(nameof(maxCount));
82+
}
83+
84+
var readReq = new ReadReq {
85+
Options = new() {
86+
ReadDirection = direction switch {
87+
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
88+
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
89+
_ => throw InvalidOption(direction)
90+
},
91+
ResolveLinks = resolveLinkTos,
92+
All = new() {
93+
Position = new() {
94+
CommitPosition = position.CommitPosition,
95+
PreparePosition = position.PreparePosition
96+
}
97+
},
98+
Count = (ulong)maxCount,
99+
UuidOption = new() { Structured = new() },
100+
ControlOption = new() { Compatibility = 1 },
101+
Filter = GetFilterOptions(eventFilter)
102+
}
103+
};
104+
105+
return new ReadAllStreamResult(async _ => {
106+
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
107+
return channelInfo.CallInvoker;
108+
}, readReq, Settings, deadline, userCredentials, cancellationToken);
109+
}
62110

63111
/// <summary>
64112
/// A class that represents the result of a read operation on the $all stream. You may either enumerate this instance directly or <see cref="Messages"/>. Do not enumerate more than once.

src/EventStore.Client.Streams/EventStoreClient.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,11 @@ private StreamAppender CreateStreamAppender() {
7373
}
7474
}
7575

76-
private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
77-
SubscriptionFilterOptions? filterOptions) {
78-
if (filterOptions == null) {
76+
private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) {
77+
if (filter == null) {
7978
return null;
8079
}
8180

82-
var filter = filterOptions.Filter;
83-
8481
var options = filter switch {
8582
StreamFilter => new ReadReq.Types.Options.Types.FilterOptions {
8683
StreamIdentifier = (filter.Prefixes, filter.Regex) switch {
@@ -127,11 +124,14 @@ private StreamAppender CreateStreamAppender() {
127124
options.Count = new Empty();
128125
}
129126

130-
options.CheckpointIntervalMultiplier = filterOptions.CheckpointInterval;
127+
options.CheckpointIntervalMultiplier = checkpointInterval;
131128

132129
return options;
133130
}
134131

132+
private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions)
133+
=> filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval);
134+
135135
/// <inheritdoc />
136136
public override void Dispose() {
137137
if (_streamAppenderLazy.IsValueCreated)

test/EventStore.Client.Streams.Tests/Read/ReadAllEventsFixture.cs

+4-6
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ public ReadAllEventsFixture() {
1010
userCredentials: TestCredentials.Root
1111
);
1212

13-
Events = Enumerable
14-
.Concat(
15-
CreateTestEvents(20),
16-
CreateTestEvents(2, metadataSize: 1_000_000)
17-
)
13+
Events = CreateTestEvents(20)
14+
.Concat(CreateTestEvents(2, metadataSize: 1_000_000))
15+
.Concat(CreateTestEvents(2, AnotherTestEventType))
1816
.ToArray();
1917

2018
ExpectedStreamName = GetStreamName();
@@ -38,4 +36,4 @@ public ReadAllEventsFixture() {
3836

3937
public EventBinaryData ExpectedFirstEvent { get; private set; }
4038
public EventBinaryData ExpectedLastEvent { get; private set; }
41-
}
39+
}

test/EventStore.Client.Streams.Tests/Read/read_all_events_backward.cs

+10-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ public async Task with_timeout_fails_when_operation_expired() {
7272
ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
7373
}
7474

75+
[Fact]
76+
public async Task filter_events_by_type() {
77+
var result = await Fixture.Streams
78+
.ReadAllAsync(Direction.Backwards, Position.End, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix))
79+
.ToListAsync();
80+
81+
result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix));
82+
}
83+
7584
[Fact(Skip = "Not Implemented")]
7685
public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException();
7786

@@ -80,4 +89,4 @@ public async Task with_timeout_fails_when_operation_expired() {
8089

8190
[Fact(Skip = "Not Implemented")]
8291
public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException();
83-
}
92+
}

test/EventStore.Client.Streams.Tests/Read/read_all_events_forward.cs

+10-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@ await result.Messages.ToArrayAsync()
139139
);
140140
}
141141

142+
[Fact]
143+
public async Task filter_events_by_type() {
144+
var result = await Fixture.Streams
145+
.ReadAllAsync(Direction.Forwards, Position.Start, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix))
146+
.ToListAsync();
147+
148+
result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix));
149+
}
150+
142151
[Fact(Skip = "Not Implemented")]
143152
public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException();
144153

@@ -147,4 +156,4 @@ await result.Messages.ToArrayAsync()
147156

148157
[Fact(Skip = "Not Implemented")]
149158
public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException();
150-
}
159+
}

test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
namespace EventStore.Client.Tests;
55

66
public partial class EventStoreFixture {
7-
public const string TestEventType = "tst";
7+
public const string TestEventType = "test-event-type";
8+
public const string AnotherTestEventTypePrefix = "another";
9+
public const string AnotherTestEventType = $"{AnotherTestEventTypePrefix}-test-event-type";
810

911
public T NewClient<T>(Action<EventStoreClientSettings> configure) where T : EventStoreClientBase, new() =>
1012
(T)Activator.CreateInstance(typeof(T), new object?[] { ClientSettings.With(configure) })!;
@@ -50,4 +52,4 @@ public async Task RestartService(TimeSpan delay) {
5052
await Streams.WarmUp();
5153
Log.Information("Service restarted.");
5254
}
53-
}
55+
}

0 commit comments

Comments
 (0)