Skip to content
Prev Previous commit
Next Next commit
Added base async components
  • Loading branch information
vladvildanov committed Aug 29, 2025
commit bad9bcc32a69265cf1c5709b41b4867362e8007b
Empty file.
26 changes: 26 additions & 0 deletions redis/asyncio/multidb/circuit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from abc import abstractmethod
from typing import Callable

import pybreaker

from redis.multidb.circuit import CircuitBreaker, State, BaseCircuitBreaker, PBCircuitBreakerAdapter


class AsyncCircuitBreaker(CircuitBreaker):
"""Async implementation of Circuit Breaker interface."""

@abstractmethod
async def on_state_changed(self, cb: Callable[["CircuitBreaker", State, State], None]):
"""Callback called when the state of the circuit changes."""
pass

class AsyncPBCircuitBreakerAdapter(BaseCircuitBreaker, AsyncCircuitBreaker):
"""
Async adapter for pybreaker's CircuitBreaker implementation.
"""
def __init__(self, cb: pybreaker.CircuitBreaker):
super().__init__(cb)
self._sync_cb = PBCircuitBreakerAdapter(cb)

async def on_state_changed(self, cb: Callable[["CircuitBreaker", State, State], None]):
self._sync_cb.on_state_changed(cb)
95 changes: 95 additions & 0 deletions redis/asyncio/multidb/command_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from abc import abstractmethod
from typing import List, Optional, Callable, Any

from redis.asyncio.client import PubSub, Pipeline
from redis.asyncio.multidb.database import Databases, AsyncDatabase
from redis.asyncio.multidb.failover import AsyncFailoverStrategy
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
from redis.asyncio.retry import Retry
from redis.multidb.command_executor import CommandExecutor


class AsyncCommandExecutor(CommandExecutor):

@property
@abstractmethod
def databases(self) -> Databases:
"""Returns a list of databases."""
pass

@property
@abstractmethod
def failure_detectors(self) -> List[AsyncFailureDetector]:
"""Returns a list of failure detectors."""
pass

@abstractmethod
def add_failure_detector(self, failure_detector: AsyncFailureDetector) -> None:
"""Adds a new failure detector to the list of failure detectors."""
pass

@property
@abstractmethod
def active_database(self) -> Optional[AsyncDatabase]:
"""Returns currently active database."""
pass

@active_database.setter
@abstractmethod
def active_database(self, database: AsyncDatabase) -> None:
"""Sets the currently active database."""
pass

@property
@abstractmethod
def active_pubsub(self) -> Optional[PubSub]:
"""Returns currently active pubsub."""
pass

@active_pubsub.setter
@abstractmethod
def active_pubsub(self, pubsub: PubSub) -> None:
"""Sets currently active pubsub."""
pass

@property
@abstractmethod
def failover_strategy(self) -> AsyncFailoverStrategy:
"""Returns failover strategy."""
pass

@property
@abstractmethod
def command_retry(self) -> Retry:
"""Returns command retry object."""
pass

@abstractmethod
async def pubsub(self, **kwargs):
"""Initializes a PubSub object on a currently active database"""
pass

@abstractmethod
async def execute_command(self, *args, **options):
"""Executes a command and returns the result."""
pass

@abstractmethod
async def execute_pipeline(self, command_stack: tuple):
"""Executes a stack of commands in pipeline."""
pass

@abstractmethod
async def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options):
"""Executes a transaction block wrapped in callback."""
pass

@abstractmethod
def execute_pubsub_method(self, method_name: str, *args, **kwargs):
"""Executes a given method on active pub/sub."""
pass

@abstractmethod
def execute_pubsub_run(self, sleep_time: float, **kwargs) -> Any:
"""Executes pub/sub run in a thread."""
pass
67 changes: 67 additions & 0 deletions redis/asyncio/multidb/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from abc import abstractmethod
from typing import Union, Optional

from redis.asyncio import Redis, RedisCluster
from redis.asyncio.multidb.circuit import AsyncCircuitBreaker
from redis.data_structure import WeightedList
from redis.multidb.database import AbstractDatabase, BaseDatabase
from redis.typing import Number


class AsyncDatabase(AbstractDatabase):
"""Database with an underlying asynchronous redis client."""
@property
@abstractmethod
def client(self) -> Union[Redis, RedisCluster]:
"""The underlying redis client."""
pass

@client.setter
@abstractmethod
def client(self, client: Union[Redis, RedisCluster]):
"""Set the underlying redis client."""
pass

@property
@abstractmethod
def circuit(self) -> AsyncCircuitBreaker:
"""Circuit breaker for the current database."""
pass

@circuit.setter
@abstractmethod
def circuit(self, circuit: AsyncCircuitBreaker):
"""Set the circuit breaker for the current database."""
pass

Databases = WeightedList[tuple[AsyncDatabase, Number]]

class Database(BaseDatabase, AsyncDatabase):
def __init__(
self,
client: Union[Redis, RedisCluster],
circuit: AsyncCircuitBreaker,
weight: float,
health_check_url: Optional[str] = None,
):
self._client = client
self._cb = circuit
self._cb.database = self
super().__init__(weight, health_check_url)

@property
def client(self) -> Union[Redis, RedisCluster]:
return self._client

@client.setter
def client(self, client: Union[Redis, RedisCluster]):
self._client = client

@property
def circuit(self) -> AsyncCircuitBreaker:
return self._cb

@circuit.setter
def circuit(self, circuit: AsyncCircuitBreaker):
self._cb = circuit

65 changes: 65 additions & 0 deletions redis/asyncio/multidb/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import List

from redis.asyncio.multidb.database import AsyncDatabase
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
from redis.event import AsyncEventListenerInterface, AsyncOnCommandsFailEvent


class AsyncActiveDatabaseChanged:
"""
Event fired when an async active database has been changed.
"""
def __init__(
self,
old_database: AsyncDatabase,
new_database: AsyncDatabase,
command_executor,
**kwargs
):
self._old_database = old_database
self._new_database = new_database
self._command_executor = command_executor
self._kwargs = kwargs

@property
def old_database(self) -> AsyncDatabase:
return self._old_database

@property
def new_database(self) -> AsyncDatabase:
return self._new_database

@property
def command_executor(self):
return self._command_executor

@property
def kwargs(self):
return self._kwargs

class ResubscribeOnActiveDatabaseChanged(AsyncEventListenerInterface):
"""
Re-subscribe the currently active pub / sub to a new active database.
"""
async def listen(self, event: AsyncActiveDatabaseChanged):
old_pubsub = event.command_executor.active_pubsub

if old_pubsub is not None:
# Re-assign old channels and patterns so they will be automatically subscribed on connection.
new_pubsub = event.new_database.client.pubsub(**event.kwargs)
new_pubsub.channels = old_pubsub.channels
new_pubsub.patterns = old_pubsub.patterns
await new_pubsub.on_connect(None)
event.command_executor.active_pubsub = new_pubsub
await old_pubsub.close()

class RegisterCommandFailure(AsyncEventListenerInterface):
"""
Event listener that registers command failures and passing it to the failure detectors.
"""
def __init__(self, failure_detectors: List[AsyncFailureDetector]):
self._failure_detectors = failure_detectors

async def listen(self, event: AsyncOnCommandsFailEvent) -> None:
for failure_detector in self._failure_detectors:
await failure_detector.register_failure(event.exception, event.commands)
51 changes: 51 additions & 0 deletions redis/asyncio/multidb/failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from abc import abstractmethod, ABC

from redis.asyncio.multidb.database import AsyncDatabase, Databases
from redis.multidb.circuit import State as CBState
from redis.asyncio.retry import Retry
from redis.data_structure import WeightedList
from redis.multidb.exception import NoValidDatabaseException
from redis.utils import dummy_fail_async


class AsyncFailoverStrategy(ABC):

@property
@abstractmethod
async def database(self) -> AsyncDatabase:
"""Select the database according to the strategy."""
pass

@abstractmethod
def set_databases(self, databases: Databases) -> None:
"""Set the database strategy operates on."""
pass

class WeightBasedFailoverStrategy(AsyncFailoverStrategy):
"""
Failover strategy based on database weights.
"""
def __init__(
self,
retry: Retry
):
self._retry = retry
self._retry.update_supported_errors([NoValidDatabaseException])
self._databases = WeightedList()

@property
async def database(self) -> AsyncDatabase:
return await self._retry.call_with_retry(
lambda: self._get_active_database(),
lambda _: dummy_fail_async()
)

def set_databases(self, databases: Databases) -> None:
self._databases = databases

async def _get_active_database(self) -> AsyncDatabase:
for database, _ in self._databases:
if database.circuit.state == CBState.CLOSED:
return database

raise NoValidDatabaseException('No valid database available for communication')
29 changes: 29 additions & 0 deletions redis/asyncio/multidb/failure_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod

from redis.multidb.failure_detector import FailureDetector


class AsyncFailureDetector(ABC):

@abstractmethod
async def register_failure(self, exception: Exception, cmd: tuple) -> None:
"""Register a failure that occurred during command execution."""
pass

@abstractmethod
def set_command_executor(self, command_executor) -> None:
"""Set the command executor for this failure."""
pass

class FailureDetectorAsyncWrapper(AsyncFailureDetector):
"""
Async wrapper for the failure detector.
"""
def __init__(self, failure_detector: FailureDetector) -> None:
self._failure_detector = failure_detector

async def register_failure(self, exception: Exception, cmd: tuple) -> None:
self._failure_detector.register_failure(exception, cmd)

def set_command_executor(self, command_executor) -> None:
self._failure_detector.set_command_executor(command_executor)
Loading