Skip to content

Issue with PendingWrite(s) and interruptions  #68

Closed
@tylerhutcherson

Description

@tylerhutcherson
poetry run pytest tests/test_interruption.py::test_aput_writes_interruption

Fails with:

___________________________________________________________ test_aput_writes_interruption ___________________________________________________________

redis_url = 'redis://0.0.0.0:55176'

    @pytest.mark.asyncio
    async def test_aput_writes_interruption(redis_url: str) -> None:
        """Test interruption during aput_writes operation."""
        # Create test data
        config, checkpoint, metadata, new_versions = create_test_checkpoint()
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = checkpoint["id"]
    
        # Successfully save a checkpoint first
        async with AsyncRedisSaver.from_conn_string(redis_url) as saver:
            next_config = await saver.aput(config, checkpoint, metadata, new_versions)
    
            # Now create a saver that will interrupt during pipeline execution
            mock_redis = MockRedis(saver._redis, "Pipeline.execute")
            mock_redis.interrupt_after_count["Pipeline.execute"] = 1
    
            # Replace the Redis client with our mock
            original_redis = saver._redis
            saver._redis = mock_redis
    
            try:
                # Try to save writes, expect interruption
                with pytest.raises(InterruptionError):
                    await saver.aput_writes(
                        next_config,
                        [("channel1", "value1"), ("channel2", "value2")],
                        "task_id_1",
                    )
    
                # Restore original Redis client to verify state
                saver._redis = original_redis
    
                # Verify that no writes were saved due to transaction abort
                checkpoint_tuple = await saver.aget_tuple(next_config)
    
                # Either there are no pending writes or they are not the ones we tried to save
                if checkpoint_tuple and checkpoint_tuple.pending_writes:
                    for write in checkpoint_tuple.pending_writes:
                        assert write.channel not in [
                            "channel1",
                            "channel2",
>                       ], "Transaction should have been rolled back"
E                       AttributeError: 'tuple' object has no attribute 'channel'

Which seems to be tied to a change in PendingWrite data model? And so now it expects a tuple here. If you update the offending like to write[1] like in other places in the test suite, it fails now on a different issue:

                # Either there are no pending writes or they are not the ones we tried to save
                if checkpoint_tuple and checkpoint_tuple.pending_writes:
                    for write in checkpoint_tuple.pending_writes:
>                       assert write[1] not in [
                            "channel1",
                            "channel2",
                        ], "Transaction should have been rolled back"
E                       AssertionError: Transaction should have been rolled back
E                       assert 'channel1' not in ['channel1', 'channel2']

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions