Skip to content

Add support for Trio via AnyIO #3647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a3c35d3
Switched to asyncio_mode=strict
agronholm Mar 30, 2025
61427a8
Created initial AnyIO client
agronholm May 1, 2025
169567f
Merge branch 'master' into anyio
agronholm May 1, 2025
845ffb9
Added the test_vsets test module
agronholm May 1, 2025
7e9c513
Updated the AnyIO tests based on updated asyncio tests
agronholm May 1, 2025
61d31a0
Fixed syntax errors on Python 3.8
agronholm May 1, 2025
c97bffa
Added necessary import
agronholm May 2, 2025
bd942ee
Fixed linter errors and asyncio use in test_cluster()
agronholm May 2, 2025
1e1bbfe
Removed debugging code
agronholm May 5, 2025
f2e774c
Merge branch 'master' into anyio
agronholm May 6, 2025
cacf5bc
Fixed Python 3.8 compatibility in test_init_slots_cache_slots_collisi…
agronholm May 6, 2025
2b6c6d2
Removed "async" from the synchronous test test_aggregations_hybrid_sc…
agronholm May 6, 2025
3aece11
Fixed test_cwe
agronholm May 8, 2025
136da0f
Fixed ExceptionGroup import
agronholm May 8, 2025
dc5a3b9
Don't expect the server to shutdown TLS before closing the underlying…
agronholm May 13, 2025
0b7cfe5
Enabled CI for the anyio branch
agronholm May 13, 2025
4f22df4
Adjusted the logging level to match asyncio and sync
agronholm May 14, 2025
d44d771
Marked TestCommandsAreNotEncoded.test_basic_command() with xfail as o…
agronholm May 14, 2025
6acb590
Added pytest.mark.asyncio to asyncio test_vsets
agronholm May 14, 2025
e45a8fb
Added changelog note
agronholm May 14, 2025
2c91e81
Removed workflow changes
agronholm May 14, 2025
3d23d9e
Merge branch 'master' into anyio
agronholm May 14, 2025
daa7522
Prevent RuntimeError while reinitializing clusters
agronholm May 14, 2025
b5702ed
Fixed SSL verification as per the asyncio code
agronholm May 14, 2025
09de0dd
Added new SSL tests
agronholm May 15, 2025
b624b07
Fixed RuntimeError in asyncio test_create_single_connection_client_fr…
agronholm May 15, 2025
98574ef
Merge branch 'master' into anyio
agronholm May 19, 2025
b03e927
Added missing pytest-asyncio mark in new test
agronholm May 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Add AnyIO based async client (and by extension, add support for Trio)
* Support transactions in ClusterPipeline
* Removing support for RedisGraph module. RedisGraph support is deprecated since Redis Stack 7.2 (https://redis.com/blog/redisgraph-eol/)
* Fix lock.extend() typedef to accept float TTL extension
Expand Down Expand Up @@ -64,7 +65,7 @@
* Fix Sentinel.execute_command doesn't execute across the entire sentinel cluster bug (#2458)
* Added a replacement for the default cluster node in the event of failure (#2463)
* Fix for Unhandled exception related to self.host with unix socket (#2496)
* Improve error output for master discovery
* Improve error output for master discovery
* Make `ClusterCommandsProtocol` an actual Protocol
* Add `sum` to DUPLICATE_POLICY documentation of `TS.CREATE`, `TS.ADD` and `TS.ALTER`
* Prevent async ClusterPipeline instances from becoming "false-y" in case of empty command stack (#3061)
Expand Down
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ uvloop
vulture>=2.3.0
numpy>=1.24.0
redis-entraid==0.4.0b2
anyio[trio] >= 4.5.2
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ classifiers = [
dependencies = ['async-timeout>=4.0.3; python_full_version<"3.11.3"']

[project.optional-dependencies]
anyio = [
"anyio >= 4.5.2",
]
hiredis = [
"hiredis>=3.0.0",
]
Expand Down Expand Up @@ -68,13 +71,13 @@ markers = [
"onlycluster: marks tests to be run only with cluster mode redis",
"onlynoncluster: marks tests to be run only with standalone redis",
"ssl: marker for only the ssl tests",
"asyncio: marker for async tests",
"asyncio: marker for asyncio tests",
"replica: replica tests",
"experimental: run only experimental tests",
"cp_integration: credential provider integration tests",
]
asyncio_default_fixture_loop_scope = "function"
asyncio_mode = "auto"
asyncio_mode = "strict"
timeout = 30
filterwarnings = [
"always",
Expand Down
1 change: 1 addition & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from redis import asyncio # noqa
from redis import anyio # noqa
from redis.backoff import default_backoff
from redis.client import Redis, StrictRedis
from redis.cluster import RedisCluster
Expand Down
64 changes: 64 additions & 0 deletions redis/anyio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from redis.anyio.client import Redis, StrictRedis
from redis.anyio.cluster import RedisCluster
from redis.anyio.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
)
from redis.anyio.sentinel import (
Sentinel,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
)
from redis.anyio.utils import from_url
from redis.backoff import default_backoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
BusyLoadingError,
ChildDeadlockedError,
ConnectionError,
DataError,
InvalidResponse,
OutOfMemoryError,
PubSubError,
ReadOnlyError,
RedisError,
ResponseError,
TimeoutError,
WatchError,
)

__all__ = [
"AuthenticationError",
"AuthenticationWrongNumberOfArgsError",
"BlockingConnectionPool",
"BusyLoadingError",
"ChildDeadlockedError",
"Connection",
"ConnectionError",
"ConnectionPool",
"DataError",
"from_url",
"default_backoff",
"InvalidResponse",
"PubSubError",
"OutOfMemoryError",
"ReadOnlyError",
"Redis",
"RedisCluster",
"RedisError",
"ResponseError",
"Sentinel",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
"SSLConnection",
"StrictRedis",
"TimeoutError",
"UnixDomainSocketConnection",
"WatchError",
]
Empty file.
209 changes: 209 additions & 0 deletions redis/anyio/_commands/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import (
Any,
AsyncIterator,
Iterable,
List,
Mapping,
Optional,
)

from ...commands.cluster import (
READ_COMMANDS,
ClusterDataAccessCommands,
ClusterManagementCommands,
ClusterMultiKeyCommands,
)
from ...commands.core import (
AsyncACLCommands,
AsyncDataAccessCommands,
AsyncFunctionCommands,
AsyncManagementCommands,
AsyncModuleCommands,
AsyncScriptCommands,
)
from ...commands.helpers import list_or_args
from ...commands.redismodules import AsyncRedisModuleCommands
from ...typing import AnyKeyT, EncodableT, KeysT, KeyT, PatternT
from ..utils import gather


class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
"""
A class containing commands that handle more than one key
"""

async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
"""
Splits the keys into different slots and then calls MGET
for the keys of every slot. This operation will not be atomic
if keys belong to more than one slot.

Returns a list of values ordered identically to ``keys``

For more information see https://redis.io/commands/mget
"""

# Concatenate all keys into a list
keys = list_or_args(keys, args)

# Split keys into slots
slots_to_keys = self._partition_keys_by_slot(keys)

# Execute commands using a pipeline
res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)

# Reorder keys in the order the user provided & return
return self._reorder_keys_by_command(keys, slots_to_keys, res)

async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
"""
Sets key/values based on a mapping. Mapping is a dictionary of
key/value pairs. Both keys and values should be strings or types that
can be cast to a string via str().

Splits the keys into different slots and then calls MSET
for the keys of every slot. This operation will not be atomic
if keys belong to more than one slot.

For more information see https://redis.io/commands/mset
"""

# Partition the keys by slot
slots_to_pairs = self._partition_pairs_by_slot(mapping)

# Execute commands using a pipeline & return list of replies
return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)

async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
"""
Runs the given command once for the keys
of each slot. Returns the sum of the return values.
"""

# Partition the keys by slot
slots_to_keys = self._partition_keys_by_slot(keys)

# Sum up the reply from each command
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))

async def _execute_pipeline_by_slot(
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
) -> List[Any]:
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
pipe = self.pipeline()
[
pipe.execute_command(
command,
*slot_args,
target_nodes=[
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
],
)
for slot, slot_args in slots_to_args.items()
]
return await pipe.execute()


class AsyncClusterManagementCommands(
ClusterManagementCommands, AsyncManagementCommands
):
"""
A class for Redis Cluster management commands

The class inherits from Redis's core ManagementCommands class and do the
required adjustments to work with cluster mode
"""

async def cluster_delslots(self, *slots: EncodableT) -> Sequence[bool]:
"""
Set hash slots as unbound in the cluster.
It determines by it self what node the slot is in and sends it there

Returns a list of the results for each processed slot.

For more information see https://redis.io/commands/cluster-delslots
"""
return await gather(
*(self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots)
)


class AsyncClusterDataAccessCommands(
ClusterDataAccessCommands, AsyncDataAccessCommands
):
"""
A class for Redis Cluster Data Access Commands

The class inherits from Redis's core DataAccessCommand class and do the
required adjustments to work with cluster mode
"""

async def scan_iter(
self,
match: Optional[PatternT] = None,
count: Optional[int] = None,
_type: Optional[str] = None,
**kwargs,
) -> AsyncIterator:
# Do the first query with cursor=0 for all nodes
cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
for value in data:
yield value

cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
if cursors:
# Get nodes by name
nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}

# Iterate over each node till its cursor is 0
kwargs.pop("target_nodes", None)
while cursors:
for name, cursor in cursors.items():
cur, data = await self.scan(
cursor=cursor,
match=match,
count=count,
_type=_type,
target_nodes=nodes[name],
**kwargs,
)
for value in data:
yield value
cursors[name] = cur[name]

cursors = {
name: cursor for name, cursor in cursors.items() if cursor != 0
}


class AsyncRedisClusterCommands(
AsyncClusterMultiKeyCommands,
AsyncClusterManagementCommands,
AsyncACLCommands,
AsyncClusterDataAccessCommands,
AsyncScriptCommands,
AsyncFunctionCommands,
AsyncModuleCommands,
AsyncRedisModuleCommands,
):
"""
A class for all Redis Cluster commands

For key-based commands, the target node(s) will be internally determined
by the keys' hash slot.
Non-key-based commands can be executed with the 'target_nodes' argument to
target specific nodes. By default, if target_nodes is not specified, the
command will be executed on the default cluster node.

:param :target_nodes: type can be one of the followings:
- nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- 'ClusterNode'
- 'list(ClusterNodes)'
- 'dict(any:clusterNodes)'

for example:
r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
"""
11 changes: 11 additions & 0 deletions redis/anyio/_parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .base import _AnyIORESPBase
from .hiredis import _AnyIOHiredisParser
from .resp2 import _AnyIORESP2Parser
from .resp3 import _AnyIORESP3Parser

__all__ = [
"_AnyIORESPBase",
"_AnyIOHiredisParser",
"_AnyIORESP2Parser",
"_AnyIORESP3Parser",
]
Loading
Loading