Skip to content

Commit c7a13ae

Browse files
authored
Support client side caching with RedisCluster (redis#3102)
* sync * fix mock_node_resp * fix mock_node_resp_func * fix test_handling_cluster_failover_to_a_replica * fix test_handling_cluster_failover_to_a_replica * async cluster and cleanup tests * delete comment
1 parent b5d4d29 commit c7a13ae

File tree

9 files changed

+398
-268
lines changed

9 files changed

+398
-268
lines changed

redis/asyncio/cluster.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
Union,
1919
)
2020

21+
from redis._cache import (
22+
DEFAULT_BLACKLIST,
23+
DEFAULT_EVICTION_POLICY,
24+
DEFAULT_WHITELIST,
25+
_LocalCache,
26+
)
2127
from redis._parsers import AsyncCommandsParser, Encoder
2228
from redis._parsers.helpers import (
2329
_RedisCallbacks,
@@ -267,6 +273,13 @@ def __init__(
267273
ssl_keyfile: Optional[str] = None,
268274
protocol: Optional[int] = 2,
269275
address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
276+
cache_enable: bool = False,
277+
client_cache: Optional[_LocalCache] = None,
278+
cache_max_size: int = 100,
279+
cache_ttl: int = 0,
280+
cache_eviction_policy: str = DEFAULT_EVICTION_POLICY,
281+
cache_blacklist: List[str] = DEFAULT_BLACKLIST,
282+
cache_whitelist: List[str] = DEFAULT_WHITELIST,
270283
) -> None:
271284
if db:
272285
raise RedisClusterException(
@@ -310,6 +323,14 @@ def __init__(
310323
"socket_timeout": socket_timeout,
311324
"retry": retry,
312325
"protocol": protocol,
326+
# Client cache related kwargs
327+
"cache_enable": cache_enable,
328+
"client_cache": client_cache,
329+
"cache_max_size": cache_max_size,
330+
"cache_ttl": cache_ttl,
331+
"cache_eviction_policy": cache_eviction_policy,
332+
"cache_blacklist": cache_blacklist,
333+
"cache_whitelist": cache_whitelist,
313334
}
314335

315336
if ssl:
@@ -682,7 +703,6 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
682703
:raises RedisClusterException: if target_nodes is not provided & the command
683704
can't be mapped to a slot
684705
"""
685-
kwargs.pop("keys", None) # the keys are used only for client side caching
686706
command = args[0]
687707
target_nodes = []
688708
target_nodes_specified = False
@@ -1039,16 +1059,24 @@ async def parse_response(
10391059
async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
10401060
# Acquire connection
10411061
connection = self.acquire_connection()
1062+
keys = kwargs.pop("keys", None)
10421063

1043-
# Execute command
1044-
await connection.send_packed_command(connection.pack_command(*args), False)
1045-
1046-
# Read response
1047-
try:
1048-
return await self.parse_response(connection, args[0], **kwargs)
1049-
finally:
1050-
# Release connection
1064+
response_from_cache = await connection._get_from_local_cache(args)
1065+
if response_from_cache is not None:
10511066
self._free.append(connection)
1067+
return response_from_cache
1068+
else:
1069+
# Execute command
1070+
await connection.send_packed_command(connection.pack_command(*args), False)
1071+
1072+
# Read response
1073+
try:
1074+
response = await self.parse_response(connection, args[0], **kwargs)
1075+
connection._add_to_local_cache(args, response, keys)
1076+
return response
1077+
finally:
1078+
# Release connection
1079+
self._free.append(connection)
10521080

10531081
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10541082
# Acquire connection

redis/asyncio/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ def __init__(
227227
_cache = None
228228
self.client_cache = client_cache if client_cache is not None else _cache
229229
if self.client_cache is not None:
230+
if self.protocol not in [3, "3"]:
231+
raise RedisError(
232+
"client caching is only supported with protocol version 3 or higher"
233+
)
230234
self.cache_blacklist = cache_blacklist
231235
self.cache_whitelist = cache_whitelist
232236

redis/cluster.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,13 @@ def parse_cluster_myshardid(resp, **options):
167167
"ssl_password",
168168
"unix_socket_path",
169169
"username",
170+
"cache_enable",
171+
"client_cache",
172+
"cache_max_size",
173+
"cache_ttl",
174+
"cache_eviction_policy",
175+
"cache_blacklist",
176+
"cache_whitelist",
170177
)
171178
KWARGS_DISABLED_KEYS = ("host", "port")
172179

@@ -1060,7 +1067,6 @@ def execute_command(self, *args, **kwargs):
10601067
list<ClusterNode>
10611068
dict<Any, ClusterNode>
10621069
"""
1063-
kwargs.pop("keys", None) # the keys are used only for client side caching
10641070
target_nodes_specified = False
10651071
is_default_node = False
10661072
target_nodes = None
@@ -1119,6 +1125,7 @@ def _execute_command(self, target_node, *args, **kwargs):
11191125
"""
11201126
Send a command to a node in the cluster
11211127
"""
1128+
keys = kwargs.pop("keys", None)
11221129
command = args[0]
11231130
redis_node = None
11241131
connection = None
@@ -1147,14 +1154,18 @@ def _execute_command(self, target_node, *args, **kwargs):
11471154
connection.send_command("ASKING")
11481155
redis_node.parse_response(connection, "ASKING", **kwargs)
11491156
asking = False
1150-
1151-
connection.send_command(*args)
1152-
response = redis_node.parse_response(connection, command, **kwargs)
1153-
if command in self.cluster_response_callbacks:
1154-
response = self.cluster_response_callbacks[command](
1155-
response, **kwargs
1156-
)
1157-
return response
1157+
response_from_cache = connection._get_from_local_cache(args)
1158+
if response_from_cache is not None:
1159+
return response_from_cache
1160+
else:
1161+
connection.send_command(*args)
1162+
response = redis_node.parse_response(connection, command, **kwargs)
1163+
if command in self.cluster_response_callbacks:
1164+
response = self.cluster_response_callbacks[command](
1165+
response, **kwargs
1166+
)
1167+
connection._add_to_local_cache(args, response, keys)
1168+
return response
11581169
except AuthenticationError:
11591170
raise
11601171
except (ConnectionError, TimeoutError) as e:

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def _get_client(
275275
redis_url = request.config.getoption("--redis-url")
276276
else:
277277
redis_url = from_url
278-
if "protocol" not in redis_url:
278+
if "protocol" not in redis_url and kwargs.get("protocol") is None:
279279
kwargs["protocol"] = request.config.getoption("--protocol")
280280

281281
cluster_mode = REDIS_INFO["cluster_enabled"]

tests/test_asyncio/conftest.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,9 @@ async def client_factory(
6969
url: str = request.config.getoption("--redis-url"),
7070
cls=redis.Redis,
7171
flushdb=True,
72-
protocol=request.config.getoption("--protocol"),
7372
**kwargs,
7473
):
75-
if "protocol" not in url:
74+
if "protocol" not in url and kwargs.get("protocol") is None:
7675
kwargs["protocol"] = request.config.getoption("--protocol")
7776

7877
cluster_mode = REDIS_INFO["cluster_enabled"]

0 commit comments

Comments
 (0)