Skip to content

Commit c54c159

Browse files
robhopmgravell
andauthored
StreamGroupInfo.Lag can be null (#2902)
* TryRead for nullable long, tests on null value for StreamConsumerGroupInfo.Lag * Update StreamTests.cs make tests async * Update StreamTests.cs tyop --------- Co-authored-by: Marc Gravell <marc.gravell@gmail.com>
1 parent afd66ef commit c54c159

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2289,6 +2289,19 @@ internal static bool TryRead(Sequence<RawResult> pairs, in CommandBytes key, ref
22892289
}
22902290
return false;
22912291
}
2292+
internal static bool TryRead(Sequence<RawResult> pairs, in CommandBytes key, ref long? value)
2293+
{
2294+
var len = pairs.Length / 2;
2295+
for (int i = 0; i < len; i++)
2296+
{
2297+
if (pairs[i * 2].IsEqual(key) && pairs[(i * 2) + 1].TryGetInt64(out var tmp))
2298+
{
2299+
value = tmp;
2300+
return true;
2301+
}
2302+
}
2303+
return false;
2304+
}
22922305

22932306
internal static bool TryRead(Sequence<RawResult> pairs, in CommandBytes key, ref int value)
22942307
{
@@ -2351,7 +2364,8 @@ protected override StreamGroupInfo ParseItem(in RawResult result)
23512364
var arr = result.GetItems();
23522365
string? name = default, lastDeliveredId = default;
23532366
int consumerCount = default, pendingMessageCount = default;
2354-
long entriesRead = default, lag = default;
2367+
long entriesRead = default;
2368+
long? lag = default;
23552369

23562370
KeyValuePairParser.TryRead(arr, KeyValuePairParser.Name, ref name);
23572371
KeyValuePairParser.TryRead(arr, KeyValuePairParser.Consumers, ref consumerCount);

tests/StackExchange.Redis.Tests/StreamTests.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,4 +2036,43 @@ await db.StreamAddAsync(
20362036
Assert.Equal(123, (int)obj!.id);
20372037
Assert.Equal("test", (string)obj.name);
20382038
}
2039+
2040+
[Fact]
2041+
public async Task StreamConsumerGroupInfoLagIsNull()
2042+
{
2043+
await using var conn = Create(require: RedisFeatures.v5_0_0);
2044+
2045+
var db = conn.GetDatabase();
2046+
var key = Me();
2047+
const string groupName = "test_group",
2048+
consumer = "consumer";
2049+
2050+
await db.StreamCreateConsumerGroupAsync(key, groupName);
2051+
await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1);
2052+
await db.StreamAddAsync(key, "field1", "value1");
2053+
await db.StreamAddAsync(key, "field1", "value1");
2054+
2055+
var streamInfo = await db.StreamInfoAsync(key);
2056+
await db.StreamDeleteAsync(key, new[] { streamInfo.LastEntry.Id });
2057+
2058+
Assert.Null((await db.StreamGroupInfoAsync(key))[0].Lag);
2059+
}
2060+
2061+
[Fact]
2062+
public async Task StreamConsumerGroupInfoLagIsTwo()
2063+
{
2064+
await using var conn = Create(require: RedisFeatures.v5_0_0);
2065+
2066+
var db = conn.GetDatabase();
2067+
var key = Me();
2068+
const string groupName = "test_group",
2069+
consumer = "consumer";
2070+
2071+
await db.StreamCreateConsumerGroupAsync(key, groupName);
2072+
await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1);
2073+
await db.StreamAddAsync(key, "field1", "value1");
2074+
await db.StreamAddAsync(key, "field1", "value1");
2075+
2076+
Assert.Equal(2, (await db.StreamGroupInfoAsync(key))[0].Lag);
2077+
}
20392078
}

0 commit comments

Comments
 (0)