Skip to content

Commit 4a841ea

Browse files
addressed PR comments
1 parent 23c2a35 commit 4a841ea

File tree

8 files changed

+267
-143
lines changed

8 files changed

+267
-143
lines changed
Lines changed: 192 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics.Metrics;
4-
using System.Linq;
54
using EventStore.Core.Data;
5+
using EventStore.Core.Messages;
66
using EventStore.Core.Services;
7+
using EventStore.Core.Services.Storage.ReaderIndex;
78
using EventStore.Core.TransactionLog.LogRecords;
89
using EventStore.Core.XUnit.Tests.Metrics;
910
using Xunit;
@@ -14,35 +15,47 @@ public class SubscriptionTrackerTests : IDisposable {
1415
private const string StreamName = "stream";
1516
private const long EndOfAllStream = 783L;
1617
private const long EndOfStream = 10L;
18+
private const long LogPosition = 849L;
19+
private const long EventNumber = 11L;
1720

18-
private readonly TestMeterListener<double> _completionListener;
1921
private readonly TestMeterListener<long> _subscriptionPositionListener;
2022
private readonly TestMeterListener<long> _streamPositionListener;
2123
private readonly Meter _meter;
2224
private readonly SubscriptionTracker _sut;
2325
private readonly Guid _subscriptionId;
2426
private readonly KeyValuePair<string, object>[] _streamTags;
2527
private readonly KeyValuePair<string, object>[] _allStreamTags;
26-
private readonly EventRecord _eventRecord;
2728
private readonly ResolvedEvent _resolvedEvent;
29+
private readonly FakeReadIndex _readIndex;
30+
private readonly TestMeterListener<long> _subscriptionCountListener;
2831

2932
public SubscriptionTrackerTests() {
3033
_meter = new Meter("eventstore");
3134

3235
var metric = new SubscriptionMetric(_meter, "test");
33-
_completionListener = new TestMeterListener<double>(_meter);
3436
_streamPositionListener = new TestMeterListener<long>(_meter);
3537
_subscriptionPositionListener = new TestMeterListener<long>(_meter);
38+
_subscriptionCountListener = new TestMeterListener<long>(_meter);
3639

3740
_sut = new SubscriptionTracker(metric);
3841

3942
_subscriptionId = Guid.NewGuid();
4043

41-
_streamTags = [new("streamName", StreamName), new("subscriptionId", _subscriptionId)];
42-
_allStreamTags = [new("streamName", SystemStreams.AllStream), new("subscriptionId", _subscriptionId)];
43-
_eventRecord = new EventRecord(11L, 849L, Guid.NewGuid(), Guid.NewGuid(), 849L, 0, StreamName, 1L,
44-
DateTime.Now, PrepareFlags.SingleWrite, "-", [], []);
45-
_resolvedEvent = ResolvedEvent.ForUnresolvedEvent(_eventRecord, _eventRecord.LogPosition);
44+
_streamTags = [new("stream-name", StreamName), new("subscription-id", _subscriptionId)];
45+
_allStreamTags = [new("stream-name", SystemStreams.AllStream), new("subscription-id", _subscriptionId)];
46+
_resolvedEvent = ResolvedEvent.ForUnresolvedEvent(
47+
new EventRecord(EventNumber, LogPosition, Guid.NewGuid(), Guid.NewGuid(), LogPosition, 0, StreamName, 1L,
48+
DateTime.Now, PrepareFlags.SingleWrite, "-", [], []), LogPosition);
49+
_readIndex = new FakeReadIndex();
50+
}
51+
52+
[Fact]
53+
public void initial_state() {
54+
Observe();
55+
56+
var actual = RetrieveMeasurements();
57+
58+
Assert.Equivalent(new[] { new { Value = 0L } }, actual);
4659
}
4760

4861
[Fact]
@@ -53,13 +66,25 @@ public void add_subscription_stream() {
5366

5467
var actual = RetrieveMeasurements();
5568

56-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
57-
new() { Value = 0.0, Tags = _streamTags },
58-
new() { Value = 0.0, Tags = _streamTags },
59-
new() { Value = EndOfStream, Tags = _streamTags }
69+
Assert.Equivalent(new[] {
70+
new { Value = 0L, Tags = _streamTags },
71+
new { Value = EndOfStream, Tags = _streamTags },
72+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
6073
}, actual);
6174
}
6275

76+
[Fact]
77+
public void remove_subscription_stream() {
78+
_sut.AddSubscription(_subscriptionId, StreamName, EndOfStream);
79+
_sut.RemoveSubscription(_subscriptionId);
80+
81+
Observe();
82+
83+
var actual = RetrieveMeasurements();
84+
85+
Assert.Equivalent(new[] { new { Value = 0L } }, actual);
86+
}
87+
6388
[Fact]
6489
public void add_subscription_all() {
6590
_sut.AddSubscription(_subscriptionId, null, EndOfAllStream);
@@ -68,106 +93,219 @@ public void add_subscription_all() {
6893

6994
var actual = RetrieveMeasurements();
7095

71-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
72-
new() { Value = 0.0, Tags = _allStreamTags },
73-
new() { Value = 0.0, Tags = _allStreamTags },
74-
new() { Value = EndOfAllStream, Tags = _allStreamTags }
96+
Assert.Equivalent(new[] {
97+
new { Value = 0L, Tags = _allStreamTags },
98+
new { Value = EndOfAllStream, Tags = _allStreamTags },
99+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
75100
}, actual);
76101
}
77102

103+
[Fact]
104+
public void remove_subscription_all() {
105+
_sut.AddSubscription(_subscriptionId, null, EndOfAllStream);
106+
_sut.RemoveSubscription(_subscriptionId);
107+
108+
Observe();
109+
110+
var actual = RetrieveMeasurements();
111+
112+
Assert.Equivalent(new[] { new { Value = 0L } }, actual);
113+
}
78114
[Fact]
79115
public void event_committed_stream() {
80116
_sut.AddSubscription(_subscriptionId, StreamName, EndOfStream);
81-
_sut.RecordEvent(_eventRecord);
117+
_sut.UpdateStreamPositions(_readIndex);
82118

83119
Observe();
84120

85121
var actual = RetrieveMeasurements();
86122

87-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
88-
new() { Value = 0.0, Tags = _streamTags },
89-
new() { Value = 0.0, Tags = _streamTags },
90-
new() { Value = _eventRecord.EventNumber, Tags = _streamTags }
123+
Assert.Equivalent(new[] {
124+
new { Value = 0L, Tags = _streamTags },
125+
new { Value = EventNumber, Tags = _streamTags },
126+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
91127
}, actual);
92128
}
93129

94130
[Fact]
95131
public void event_committed_all() {
96132
_sut.AddSubscription(_subscriptionId, null, EndOfAllStream);
97-
_sut.RecordEvent(_eventRecord);
133+
_sut.UpdateStreamPositions(_readIndex);
98134

99135
Observe();
100136

101137
var actual = RetrieveMeasurements();
102138

103-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
104-
new() { Value = 0.0, Tags = _allStreamTags },
105-
new() { Value = 0.0, Tags = _allStreamTags },
106-
new() { Value = _eventRecord.LogPosition, Tags = _allStreamTags }
139+
Assert.Equivalent(new[] {
140+
new { Value = 0L, Tags = _allStreamTags },
141+
new { Value = LogPosition, Tags = _allStreamTags },
142+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
107143
}, actual);
108144
}
109145

110146
[Fact]
111147
public void event_processed_stream() {
112148
_sut.AddSubscription(_subscriptionId, StreamName, EndOfStream);
113-
_sut.RecordEvent(_eventRecord);
149+
_sut.UpdateStreamPositions(_readIndex);
114150
_sut.ProcessEvent(_resolvedEvent);
115151

116152
Observe();
117153

118154
var actual = RetrieveMeasurements();
119155

120-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
121-
new() { Value = 1.0, Tags = _streamTags },
122-
new() { Value = _eventRecord.EventNumber, Tags = _streamTags },
123-
new() { Value = _eventRecord.EventNumber, Tags = _streamTags }
156+
Assert.Equivalent(new[] {
157+
new { Value = EventNumber, Tags = _streamTags },
158+
new { Value = EventNumber, Tags = _streamTags },
159+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
124160
}, actual);
125161
}
126162

127163
[Fact]
128164
public void event_processed_all() {
129165
_sut.AddSubscription(_subscriptionId, null, EndOfAllStream);
130-
_sut.RecordEvent(_eventRecord);
166+
_sut.UpdateStreamPositions(_readIndex);
131167
_sut.ProcessEvent(_resolvedEvent);
132168

133169
Observe();
134170

135171
var actual = RetrieveMeasurements();
136172

137-
Assert.Equivalent(new TestMeterListener<double>.TestMeasurement[] {
138-
new() { Value = 1.0, Tags = _allStreamTags },
139-
new() { Value = _eventRecord.LogPosition, Tags = _allStreamTags },
140-
new() { Value = _eventRecord.LogPosition, Tags = _allStreamTags }
173+
Assert.Equivalent(new[] {
174+
new { Value = LogPosition, Tags = _allStreamTags },
175+
new { Value = LogPosition, Tags = _allStreamTags },
176+
new { Value = 1L, Tags = Array.Empty<KeyValuePair<string, object>>() }
141177
}, actual);
142178
}
143179

144180
private void Observe() {
145-
_completionListener.Observe();
146181
_subscriptionPositionListener.Observe();
147182
_streamPositionListener.Observe();
183+
_subscriptionCountListener.Observe();
148184
}
149185

150-
private TestMeterListener<double>.TestMeasurement[] RetrieveMeasurements() => [
151-
_completionListener.RetrieveMeasurements("test-completed")
152-
.Single(),
153-
_subscriptionPositionListener.RetrieveMeasurements("test-subscription-position")
154-
.Select(x => new TestMeterListener<double>.TestMeasurement {
155-
Tags = x.Tags,
156-
Value = x.Value
157-
})
158-
.Single(),
159-
_streamPositionListener.RetrieveMeasurements("test-stream-position")
160-
.Select(x => new TestMeterListener<double>.TestMeasurement {
161-
Tags = x.Tags,
162-
Value = x.Value
163-
})
164-
.Single()
186+
private TestMeterListener<long>.TestMeasurement[] RetrieveMeasurements() => [
187+
.._subscriptionPositionListener.RetrieveMeasurements("test-subscription-position"),
188+
.._streamPositionListener.RetrieveMeasurements("test-stream-position"),
189+
.._subscriptionCountListener.RetrieveMeasurements("test-subscription-count")
165190
];
166191

167192
public void Dispose() {
168-
_completionListener?.Dispose();
169193
_subscriptionPositionListener?.Dispose();
170194
_streamPositionListener?.Dispose();
171195
_meter?.Dispose();
172196
}
197+
198+
private class FakeReadIndex : IReadIndex<string> {
199+
public long LastIndexedPosition => LogPosition;
200+
public string GetStreamId(string streamName) => streamName;
201+
public long GetStreamLastEventNumber(string streamId) => EventNumber;
202+
203+
#region Not Implemented
204+
205+
public ReadIndexStats GetStatistics() {
206+
throw new NotImplementedException();
207+
}
208+
209+
public IndexReadAllResult ReadAllEventsForward(TFPos pos, int maxCount) {
210+
throw new NotImplementedException();
211+
}
212+
213+
public IndexReadAllResult ReadAllEventsBackward(TFPos pos, int maxCount) {
214+
throw new NotImplementedException();
215+
}
216+
217+
public IndexReadAllResult ReadAllEventsForwardFiltered(TFPos pos, int maxCount, int maxSearchWindow,
218+
IEventFilter eventFilter) {
219+
throw new NotImplementedException();
220+
}
221+
222+
public IndexReadAllResult
223+
ReadAllEventsBackwardFiltered(TFPos pos, int maxCount, int maxSearchWindow, IEventFilter eventFilter) {
224+
throw new NotImplementedException();
225+
}
226+
227+
public void Close() {
228+
throw new NotImplementedException();
229+
}
230+
231+
public void Dispose() {
232+
throw new NotImplementedException();
233+
}
234+
235+
public IIndexWriter<string> IndexWriter { get; }
236+
237+
public IndexReadEventResult ReadEvent(string streamName, string streamId, long eventNumber) {
238+
throw new NotImplementedException();
239+
}
240+
241+
public IndexReadStreamResult ReadStreamEventsBackward(string streamName, string streamId, long fromEventNumber,
242+
int maxCount) {
243+
throw new NotImplementedException();
244+
}
245+
246+
public IndexReadStreamResult ReadStreamEventsForward(string streamName, string streamId, long fromEventNumber,
247+
int maxCount) {
248+
throw new NotImplementedException();
249+
}
250+
251+
public IndexReadEventInfoResult ReadEventInfo_KeepDuplicates(string streamId, long eventNumber) {
252+
throw new NotImplementedException();
253+
}
254+
255+
public IndexReadEventInfoResult ReadEventInfoForward_KnownCollisions(string streamId, long fromEventNumber,
256+
int maxCount,
257+
long beforePosition) {
258+
throw new NotImplementedException();
259+
}
260+
261+
public IndexReadEventInfoResult ReadEventInfoForward_NoCollisions(ulong stream, long fromEventNumber,
262+
int maxCount,
263+
long beforePosition) {
264+
throw new NotImplementedException();
265+
}
266+
267+
public IndexReadEventInfoResult ReadEventInfoBackward_KnownCollisions(string streamId, long fromEventNumber,
268+
int maxCount,
269+
long beforePosition) {
270+
throw new NotImplementedException();
271+
}
272+
273+
public IndexReadEventInfoResult ReadEventInfoBackward_NoCollisions(ulong stream,
274+
Func<ulong, string> getStreamId, long fromEventNumber,
275+
int maxCount, long beforePosition) {
276+
throw new NotImplementedException();
277+
}
278+
279+
public bool IsStreamDeleted(string streamId) {
280+
throw new NotImplementedException();
281+
}
282+
283+
public long GetStreamLastEventNumber_KnownCollisions(string streamId, long beforePosition) {
284+
throw new NotImplementedException();
285+
}
286+
287+
public long GetStreamLastEventNumber_NoCollisions(ulong stream, Func<ulong, string> getStreamId,
288+
long beforePosition) {
289+
throw new NotImplementedException();
290+
}
291+
292+
public StreamMetadata GetStreamMetadata(string streamId) {
293+
throw new NotImplementedException();
294+
}
295+
296+
public StorageMessage.EffectiveAcl GetEffectiveAcl(string streamId) {
297+
throw new NotImplementedException();
298+
}
299+
300+
public string GetEventStreamIdByTransactionId(long transactionId) {
301+
throw new NotImplementedException();
302+
}
303+
304+
305+
public string GetStreamName(string streamId) {
306+
throw new NotImplementedException();
307+
}
308+
309+
#endregion
310+
}
173311
}

0 commit comments

Comments
 (0)