Skip to content

Commit 8636709

Browse files
committed
Address review feedback for LMDB state store
- Move class attribute after docstring for proper documentation tooling - Simplify metadata to only store end_block and end_hash (remove unused start_parent_hash) - Return False for empty batch_ids in is_processed (align with InMemoryStreamStateStore) - Remove prev_hash from resume watermark BlockRange (was semantically incorrect) - Add comment explaining resume watermark start=end=end_block pattern
1 parent ef69741 commit 8636709

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

src/amp/streaming/lmdb_state.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919

2020
class LMDBStreamStateStore(StreamStateStore):
21-
env: lmdb.Environment
2221
"""
2322
Generic LMDB-based state store for tracking processed batches.
2423
@@ -39,9 +38,11 @@ class LMDBStreamStateStore(StreamStateStore):
3938
4039
Metadata database layout:
4140
- Key: {connection_name}|{table_name}|{network}
42-
- Value: JSON with {end_block, end_hash, start_parent_hash} (max processed block)
41+
- Value: JSON with {end_block, end_hash} (max processed block for resume)
4342
"""
4443

44+
env: lmdb.Environment
45+
4546
def __init__(
4647
self,
4748
connection_name: str,
@@ -95,12 +96,11 @@ def _serialize_batch(self, batch: BatchIdentifier) -> bytes:
9596
}
9697
return json.dumps(batch_value_dict).encode('utf-8')
9798

98-
def _serialize_metadata(self, end_block: int, end_hash: str, start_parent_hash: str) -> bytes:
99+
def _serialize_metadata(self, end_block: int, end_hash: str) -> bytes:
99100
"""Serialize metadata to JSON bytes."""
100101
meta_value_dict = {
101102
'end_block': end_block,
102103
'end_hash': end_hash,
103-
'start_parent_hash': start_parent_hash,
104104
}
105105
return json.dumps(meta_value_dict).encode('utf-8')
106106

@@ -121,7 +121,7 @@ def is_processed(self, connection_name: str, table_name: str, batch_ids: List[Ba
121121
True only if ALL batches are already processed
122122
"""
123123
if not batch_ids:
124-
return True
124+
return False
125125

126126
with self.env.begin(db=self.batches_db) as txn:
127127
for batch_id in batch_ids:
@@ -161,7 +161,7 @@ def mark_processed(self, connection_name: str, table_name: str, batch_ids: List[
161161
should_update = True
162162

163163
if should_update:
164-
meta_value = self._serialize_metadata(batch.end_block, batch.end_hash, batch.start_parent_hash)
164+
meta_value = self._serialize_metadata(batch.end_block, batch.end_hash)
165165
txn.put(meta_key, meta_value, db=self.metadata_db)
166166

167167
self.logger.debug(f'Marked {len(batch_ids)} batches as processed in {table_name}')
@@ -205,13 +205,15 @@ def get_resume_position(
205205
_, _, network = self._parse_key(key)
206206
meta_data = self._deserialize_batch(value)
207207

208+
# BlockRange here represents a resume watermark (open-ended), not a processed range.
209+
# start=end=end_block means "resume streaming from this block onwards".
210+
# See ReorgAwareStream for how this watermark is used for crash recovery.
208211
ranges.append(
209212
BlockRange(
210213
network=network,
211214
start=meta_data['end_block'],
212215
end=meta_data['end_block'],
213216
hash=meta_data.get('end_hash'),
214-
prev_hash=meta_data.get('start_parent_hash'),
215217
)
216218
)
217219

@@ -298,9 +300,7 @@ def invalidate_from_block(
298300
if remaining_batches:
299301
remaining_batches.sort(key=lambda b: b['end_block'])
300302
max_batch = remaining_batches[-1]
301-
meta_value = self._serialize_metadata(
302-
max_batch['end_block'], max_batch.get('end_hash'), max_batch.get('start_parent_hash')
303-
)
303+
meta_value = self._serialize_metadata(max_batch['end_block'], max_batch.get('end_hash'))
304304
txn.put(meta_key, meta_value, db=self.metadata_db)
305305
else:
306306
txn.delete(meta_key, db=self.metadata_db)

tests/unit/test_lmdb_state.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def test_mark_and_check_processed(self, lmdb_store):
4747

4848
def test_is_processed_empty_list(self, lmdb_store):
4949
result = lmdb_store.is_processed('conn1', 'table1', [])
50-
assert result is True
50+
assert result is False
5151

5252
def test_multiple_batches_all_must_be_processed(self, lmdb_store):
5353
batch_id1 = BatchIdentifier('ethereum', 100, 200, '0xabc')
@@ -106,7 +106,6 @@ def test_get_resume_position_single_network(self, lmdb_store):
106106
assert watermark.ranges[0].network == 'ethereum'
107107
assert watermark.ranges[0].end == 400
108108
assert watermark.ranges[0].hash == '0x123'
109-
assert watermark.ranges[0].prev_hash == '0xparent3'
110109

111110
def test_get_resume_position_multiple_networks(self, lmdb_store):
112111
eth_batch = BatchIdentifier('ethereum', 100, 200, '0xabc', '0xeth_parent')
@@ -285,7 +284,6 @@ def test_resume_position_with_many_out_of_order_batches(self, lmdb_store):
285284
watermark = lmdb_store.get_resume_position('conn1', 'table1')
286285
assert watermark.ranges[0].end == 800
287286
assert watermark.ranges[0].hash == '0x9'
288-
assert watermark.ranges[0].prev_hash == '0xp9'
289287

290288

291289
class TestIntegrationScenarios:

0 commit comments

Comments
 (0)