Skip to content

Commit 04e4f90

Browse files
committed
fix: update reorg tests to set ranges_complete=True for proper state tracking
The recent microbatch processing changes require ranges_complete=True for batches to be tracked in the state store. This fixes all reorg handling tests by ensuring test batches are properly marked as complete, allowing the reorg deletion logic to find and remove the appropriate data. - Updated 16 reorg-related tests across 4 loader implementations - All test batches now set ranges_complete=True in BatchMetadata - Ensures accurate testing of real-world reorg handling behavior
1 parent 7882a4b commit 04e4f90

File tree

4 files changed

+167
-41
lines changed

4 files changed

+167
-41
lines changed

tests/integration/test_deltalake_loader.py

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -586,15 +586,24 @@ def test_handle_reorg_single_network(self, delta_temp_config):
586586
# Create response batches with hashes
587587
response1 = ResponseBatch.data_batch(
588588
data=batch1,
589-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc')]),
589+
metadata=BatchMetadata(
590+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc')],
591+
ranges_complete=True, # Mark as complete so it gets tracked in state store
592+
),
590593
)
591594
response2 = ResponseBatch.data_batch(
592595
data=batch2,
593-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')]),
596+
metadata=BatchMetadata(
597+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')],
598+
ranges_complete=True, # Mark as complete so it gets tracked in state store
599+
),
594600
)
595601
response3 = ResponseBatch.data_batch(
596602
data=batch3,
597-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')]),
603+
metadata=BatchMetadata(
604+
ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')],
605+
ranges_complete=True, # Mark as complete so it gets tracked in state store
606+
),
598607
)
599608

600609
# Load via streaming API
@@ -637,19 +646,31 @@ def test_handle_reorg_multi_network(self, delta_temp_config):
637646
# Create response batches with network-specific ranges
638647
response1 = ResponseBatch.data_batch(
639648
data=batch1,
640-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')]),
649+
metadata=BatchMetadata(
650+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')],
651+
ranges_complete=True, # Mark as complete so it gets tracked in state store
652+
),
641653
)
642654
response2 = ResponseBatch.data_batch(
643655
data=batch2,
644-
metadata=BatchMetadata(ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')]),
656+
metadata=BatchMetadata(
657+
ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')],
658+
ranges_complete=True, # Mark as complete so it gets tracked in state store
659+
),
645660
)
646661
response3 = ResponseBatch.data_batch(
647662
data=batch3,
648-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')]),
663+
metadata=BatchMetadata(
664+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')],
665+
ranges_complete=True, # Mark as complete so it gets tracked in state store
666+
),
649667
)
650668
response4 = ResponseBatch.data_batch(
651669
data=batch4,
652-
metadata=BatchMetadata(ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')]),
670+
metadata=BatchMetadata(
671+
ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')],
672+
ranges_complete=True, # Mark as complete so it gets tracked in state store
673+
),
653674
)
654675

655676
# Load via streaming API
@@ -689,15 +710,24 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
689710
# Batch 3: 170-190 (after reorg, but should be deleted as 170 >= 150)
690711
response1 = ResponseBatch.data_batch(
691712
data=batch1,
692-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=90, end=110, hash='0xaaa')]),
713+
metadata=BatchMetadata(
714+
ranges=[BlockRange(network='ethereum', start=90, end=110, hash='0xaaa')],
715+
ranges_complete=True, # Mark as complete so it gets tracked in state store
716+
),
693717
)
694718
response2 = ResponseBatch.data_batch(
695719
data=batch2,
696-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')]),
720+
metadata=BatchMetadata(
721+
ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')],
722+
ranges_complete=True, # Mark as complete so it gets tracked in state store
723+
),
697724
)
698725
response3 = ResponseBatch.data_batch(
699726
data=batch3,
700-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')]),
727+
metadata=BatchMetadata(
728+
ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')],
729+
ranges_complete=True, # Mark as complete so it gets tracked in state store
730+
),
701731
)
702732

703733
# Load via streaming API
@@ -733,15 +763,24 @@ def test_handle_reorg_version_history(self, delta_temp_config):
733763

734764
response1 = ResponseBatch.data_batch(
735765
data=batch1,
736-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=0, end=10, hash='0xaaa')]),
766+
metadata=BatchMetadata(
767+
ranges=[BlockRange(network='ethereum', start=0, end=10, hash='0xaaa')],
768+
ranges_complete=True, # Mark as complete so it gets tracked in state store
769+
),
737770
)
738771
response2 = ResponseBatch.data_batch(
739772
data=batch2,
740-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=50, end=60, hash='0xbbb')]),
773+
metadata=BatchMetadata(
774+
ranges=[BlockRange(network='ethereum', start=50, end=60, hash='0xbbb')],
775+
ranges_complete=True, # Mark as complete so it gets tracked in state store
776+
),
741777
)
742778
response3 = ResponseBatch.data_batch(
743779
data=batch3,
744-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xccc')]),
780+
metadata=BatchMetadata(
781+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xccc')],
782+
ranges_complete=True, # Mark as complete so it gets tracked in state store
783+
),
745784
)
746785

747786
# Load via streaming API
@@ -792,12 +831,18 @@ def test_streaming_with_reorg(self, delta_temp_config):
792831
# Create response batches using factory methods (with hashes for proper state management)
793832
response1 = ResponseBatch.data_batch(
794833
data=data1,
795-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc123')]),
834+
metadata=BatchMetadata(
835+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc123')],
836+
ranges_complete=True, # Mark as complete so it gets tracked in state store
837+
),
796838
)
797839

798840
response2 = ResponseBatch.data_batch(
799841
data=data2,
800-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef456')]),
842+
metadata=BatchMetadata(
843+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef456')],
844+
ranges_complete=True, # Mark as complete so it gets tracked in state store
845+
),
801846
)
802847

803848
# Simulate reorg event using factory method

tests/integration/test_lmdb_loader.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -411,15 +411,24 @@ def test_handle_reorg_single_network(self, lmdb_config):
411411
# Create response batches with hashes
412412
response1 = ResponseBatch.data_batch(
413413
data=batch1,
414-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc')]),
414+
metadata=BatchMetadata(
415+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc')],
416+
ranges_complete=True, # Mark as complete so it gets tracked in state store
417+
),
415418
)
416419
response2 = ResponseBatch.data_batch(
417420
data=batch2,
418-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')]),
421+
metadata=BatchMetadata(
422+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')],
423+
ranges_complete=True, # Mark as complete so it gets tracked in state store
424+
),
419425
)
420426
response3 = ResponseBatch.data_batch(
421427
data=batch3,
422-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')]),
428+
metadata=BatchMetadata(
429+
ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')],
430+
ranges_complete=True, # Mark as complete so it gets tracked in state store
431+
),
423432
)
424433

425434
# Load via streaming API
@@ -468,19 +477,31 @@ def test_handle_reorg_multi_network(self, lmdb_config):
468477
# Create response batches with network-specific ranges
469478
response1 = ResponseBatch.data_batch(
470479
data=batch1,
471-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')]),
480+
metadata=BatchMetadata(
481+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')],
482+
ranges_complete=True, # Mark as complete so it gets tracked in state store
483+
),
472484
)
473485
response2 = ResponseBatch.data_batch(
474486
data=batch2,
475-
metadata=BatchMetadata(ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')]),
487+
metadata=BatchMetadata(
488+
ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')],
489+
ranges_complete=True, # Mark as complete so it gets tracked in state store
490+
),
476491
)
477492
response3 = ResponseBatch.data_batch(
478493
data=batch3,
479-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')]),
494+
metadata=BatchMetadata(
495+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')],
496+
ranges_complete=True, # Mark as complete so it gets tracked in state store
497+
),
480498
)
481499
response4 = ResponseBatch.data_batch(
482500
data=batch4,
483-
metadata=BatchMetadata(ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')]),
501+
metadata=BatchMetadata(
502+
ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')],
503+
ranges_complete=True, # Mark as complete so it gets tracked in state store
504+
),
484505
)
485506

486507
# Load via streaming API
@@ -524,15 +545,24 @@ def test_handle_reorg_overlapping_ranges(self, lmdb_config):
524545
# Batch 3: 170-190 (after reorg, but should be deleted as 170 >= 150)
525546
response1 = ResponseBatch.data_batch(
526547
data=batch1,
527-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=90, end=110, hash='0xaaa')]),
548+
metadata=BatchMetadata(
549+
ranges=[BlockRange(network='ethereum', start=90, end=110, hash='0xaaa')],
550+
ranges_complete=True, # Mark as complete so it gets tracked in state store
551+
),
528552
)
529553
response2 = ResponseBatch.data_batch(
530554
data=batch2,
531-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')]),
555+
metadata=BatchMetadata(
556+
ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')],
557+
ranges_complete=True, # Mark as complete so it gets tracked in state store
558+
),
532559
)
533560
response3 = ResponseBatch.data_batch(
534561
data=batch3,
535-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')]),
562+
metadata=BatchMetadata(
563+
ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')],
564+
ranges_complete=True, # Mark as complete so it gets tracked in state store
565+
),
536566
)
537567

538568
# Load via streaming API
@@ -577,12 +607,18 @@ def test_streaming_with_reorg(self, lmdb_config):
577607
# Create response batches using factory methods (with hashes for proper state management)
578608
response1 = ResponseBatch.data_batch(
579609
data=data1,
580-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc123')]),
610+
metadata=BatchMetadata(
611+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc123')],
612+
ranges_complete=True, # Mark as complete so it gets tracked in state store
613+
),
581614
)
582615

583616
response2 = ResponseBatch.data_batch(
584617
data=data2,
585-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef456')]),
618+
metadata=BatchMetadata(
619+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef456')],
620+
ranges_complete=True, # Mark as complete so it gets tracked in state store
621+
),
586622
)
587623

588624
# Simulate reorg event using factory method

tests/integration/test_postgresql_loader.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -541,19 +541,31 @@ def test_handle_reorg_deletion(self, postgresql_test_config, test_table_name, cl
541541
# Create response batches with hashes
542542
response1 = ResponseBatch.data_batch(
543543
data=batch1,
544-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=102, hash='0xaaa')]),
544+
metadata=BatchMetadata(
545+
ranges=[BlockRange(network='ethereum', start=100, end=102, hash='0xaaa')],
546+
ranges_complete=True, # Mark as complete so it gets tracked in state store
547+
),
545548
)
546549
response2 = ResponseBatch.data_batch(
547550
data=batch2,
548-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=103, end=104, hash='0xbbb')]),
551+
metadata=BatchMetadata(
552+
ranges=[BlockRange(network='ethereum', start=103, end=104, hash='0xbbb')],
553+
ranges_complete=True, # Mark as complete so it gets tracked in state store
554+
),
549555
)
550556
response3 = ResponseBatch.data_batch(
551557
data=batch3,
552-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=105, end=106, hash='0xccc')]),
558+
metadata=BatchMetadata(
559+
ranges=[BlockRange(network='ethereum', start=105, end=106, hash='0xccc')],
560+
ranges_complete=True, # Mark as complete so it gets tracked in state store
561+
),
553562
)
554563
response4 = ResponseBatch.data_batch(
555564
data=batch4,
556-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=107, end=108, hash='0xddd')]),
565+
metadata=BatchMetadata(
566+
ranges=[BlockRange(network='ethereum', start=107, end=108, hash='0xddd')],
567+
ranges_complete=True, # Mark as complete so it gets tracked in state store
568+
),
557569
)
558570

559571
# Load via streaming API
@@ -605,7 +617,10 @@ def test_reorg_with_overlapping_ranges(self, postgresql_test_config, test_table_
605617

606618
response = ResponseBatch.data_batch(
607619
data=batch,
608-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=175, hash='0xaaa')]),
620+
metadata=BatchMetadata(
621+
ranges=[BlockRange(network='ethereum', start=150, end=175, hash='0xaaa')],
622+
ranges_complete=True, # Mark as complete so it gets tracked in state store
623+
),
609624
)
610625

611626
# Load via streaming API
@@ -658,11 +673,17 @@ def test_reorg_preserves_different_networks(self, postgresql_test_config, test_t
658673

659674
response_eth = ResponseBatch.data_batch(
660675
data=batch_eth,
661-
metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=100, hash='0xaaa')]),
676+
metadata=BatchMetadata(
677+
ranges=[BlockRange(network='ethereum', start=100, end=100, hash='0xaaa')],
678+
ranges_complete=True, # Mark as complete so it gets tracked in state store
679+
),
662680
)
663681
response_poly = ResponseBatch.data_batch(
664682
data=batch_poly,
665-
metadata=BatchMetadata(ranges=[BlockRange(network='polygon', start=100, end=100, hash='0xbbb')]),
683+
metadata=BatchMetadata(
684+
ranges=[BlockRange(network='polygon', start=100, end=100, hash='0xbbb')],
685+
ranges_complete=True, # Mark as complete so it gets tracked in state store
686+
),
666687
)
667688

668689
# Load both batches via streaming API

0 commit comments

Comments
 (0)