Skip to content

Commit 63d0f5a

Browse files
committed
wip
1 parent e56e805 commit 63d0f5a

File tree

3 files changed

+101
-23
lines changed

3 files changed

+101
-23
lines changed

src/providers/WorkflowCore.Providers.AWS/Interface/IKinesisTracker.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ namespace WorkflowCore.Providers.AWS.Interface
88
public interface IKinesisTracker
99
{
1010
Task<string> GetNextShardIterator(string app, string stream, string shard);
11+
Task<string> GetNextLastSequenceNumber(string app, string stream, string shard);
1112
Task IncrementShardIterator(string app, string stream, string shard, string iterator);
13+
Task IncrementShardIteratorAndSequence(string app, string stream, string shard, string iterator, string sequence);
1214
}
1315
}

src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,31 +68,16 @@ private async void Process()
6868

6969
try
7070
{
71-
var iterator = await _tracker.GetNextShardIterator(sub.AppName, sub.Stream, sub.Shard.ShardId);
72-
73-
if (iterator == null)
74-
{
75-
var iterResp = await _client.GetShardIteratorAsync(new GetShardIteratorRequest()
76-
{
77-
ShardId = sub.Shard.ShardId,
78-
StreamName = sub.Stream,
79-
ShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER,
80-
StartingSequenceNumber = sub.Shard.SequenceNumberRange.StartingSequenceNumber
81-
});
82-
iterator = iterResp.ShardIterator;
83-
}
84-
85-
var records = await _client.GetRecordsAsync(new GetRecordsRequest()
86-
{
87-
ShardIterator = iterator,
88-
Limit = _batchSize
89-
});
71+
var records = await GetBatch(sub);
9072

9173
if (records.Records.Count == 0)
9274
sub.Snooze = DateTime.Now.AddSeconds(5);
9375

76+
var lastSequence = string.Empty;
77+
9478
foreach (var rec in records.Records)
9579
{
80+
lastSequence = rec.SequenceNumber;
9681
try
9782
{
9883
sub.Action(rec);
@@ -103,7 +88,10 @@ private async void Process()
10388
}
10489
}
10590

106-
await _tracker.IncrementShardIterator(sub.AppName, sub.Stream, sub.Shard.ShardId, records.NextShardIterator);
91+
if (lastSequence != string.Empty)
92+
await _tracker.IncrementShardIteratorAndSequence(sub.AppName, sub.Stream, sub.Shard.ShardId, records.NextShardIterator, lastSequence);
93+
else
94+
await _tracker.IncrementShardIterator(sub.AppName, sub.Stream, sub.Shard.ShardId, records.NextShardIterator);
10795
}
10896
finally
10997
{
@@ -121,6 +109,53 @@ private async void Process()
121109
}
122110
}
123111

112+
private async Task<GetRecordsResponse> GetBatch(ShardSubscription sub)
113+
{
114+
var iterator = await _tracker.GetNextShardIterator(sub.AppName, sub.Stream, sub.Shard.ShardId);
115+
116+
if (iterator == null)
117+
{
118+
var iterResp = await _client.GetShardIteratorAsync(new GetShardIteratorRequest()
119+
{
120+
ShardId = sub.Shard.ShardId,
121+
StreamName = sub.Stream,
122+
ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
123+
StartingSequenceNumber = sub.Shard.SequenceNumberRange.StartingSequenceNumber
124+
});
125+
iterator = iterResp.ShardIterator;
126+
}
127+
128+
try
129+
{
130+
var result = await _client.GetRecordsAsync(new GetRecordsRequest()
131+
{
132+
ShardIterator = iterator,
133+
Limit = _batchSize
134+
});
135+
136+
return result;
137+
}
138+
catch (ExpiredIteratorException)
139+
{
140+
var lastSequence = await _tracker.GetNextLastSequenceNumber(sub.AppName, sub.Stream, sub.Shard.ShardId);
141+
var iterResp = await _client.GetShardIteratorAsync(new GetShardIteratorRequest()
142+
{
143+
ShardId = sub.Shard.ShardId,
144+
StreamName = sub.Stream,
145+
ShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER,
146+
StartingSequenceNumber = lastSequence
147+
});
148+
iterator = iterResp.ShardIterator;
149+
150+
var result = await _client.GetRecordsAsync(new GetRecordsRequest()
151+
{
152+
ShardIterator = iterator,
153+
Limit = _batchSize
154+
});
155+
156+
return result;
157+
}
158+
}
124159

125160
public void Dispose()
126161
{

src/providers/WorkflowCore.Providers.AWS/Services/KinesisTracker.cs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,47 @@ public async Task<string> GetNextShardIterator(string app, string stream, string
4646
return response.Item["next_iterator"].S;
4747
}
4848

49+
public async Task<string> GetNextLastSequenceNumber(string app, string stream, string shard)
50+
{
51+
if (!_tableConfirmed)
52+
await EnsureTable();
53+
54+
var response = await _client.GetItemAsync(new GetItemRequest()
55+
{
56+
TableName = _tableName,
57+
Key = new Dictionary<string, AttributeValue>
58+
{
59+
{ "id", new AttributeValue(FormatId(app, stream, shard)) }
60+
}
61+
});
62+
63+
if (!response.Item.ContainsKey("last_sequence"))
64+
return null;
65+
66+
return response.Item["last_sequence"].S;
67+
}
68+
4969
public async Task IncrementShardIterator(string app, string stream, string shard, string iterator)
70+
{
71+
if (!_tableConfirmed)
72+
await EnsureTable();
73+
74+
await _client.UpdateItemAsync(new UpdateItemRequest()
75+
{
76+
TableName = _tableName,
77+
Key = new Dictionary<string, AttributeValue>
78+
{
79+
{"id", new AttributeValue(FormatId(app, stream, shard))}
80+
},
81+
UpdateExpression = "SET next_iterator = :n",
82+
ExpressionAttributeValues = new Dictionary<string, AttributeValue>()
83+
{
84+
{ ":n" , new AttributeValue(iterator) }
85+
}
86+
});
87+
}
88+
89+
public async Task IncrementShardIteratorAndSequence(string app, string stream, string shard, string iterator, string sequence)
5090
{
5191
if (!_tableConfirmed)
5292
await EnsureTable();
@@ -56,12 +96,13 @@ await _client.PutItemAsync(new PutItemRequest()
5696
TableName = _tableName,
5797
Item = new Dictionary<string, AttributeValue>
5898
{
59-
{ "id", new AttributeValue(FormatId(app, stream, shard)) },
60-
{ "next_iterator", new AttributeValue(iterator) }
99+
{"id", new AttributeValue(FormatId(app, stream, shard))},
100+
{"next_iterator", new AttributeValue(iterator)},
101+
{"last_sequence", new AttributeValue(sequence)}
61102
}
62103
});
63104
}
64-
105+
65106
private async Task EnsureTable()
66107
{
67108
try

0 commit comments

Comments
 (0)