Skip to content

Commit

Permalink
cluster/scan_iter: fix iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarshgupta137 committed Mar 17, 2022
1 parent d0e0126 commit 6e7f052
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Add limited support for Lua scripting with RedisCluster
* Implement `.lock()` method on RedisCluster
* Fix cursor returned by SCAN for RedisCluster & change default target to PRIMARIES
* Fix scan_iter for RedisCluster

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
38 changes: 38 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Iterator, Union

from redis.crc import key_slot
from redis.exceptions import RedisClusterException, RedisError
from redis.typing import PatternT

from .core import (
ACLCommands,
Expand Down Expand Up @@ -206,6 +209,41 @@ def stralgo(
**kwargs,
)

def scan_iter(
self,
match: Union[PatternT, None] = None,
count: Union[int, None] = None,
_type: Union[str, None] = None,
**kwargs,
) -> Iterator:
# Do the first query with cursor=0 for all nodes
cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
yield from data

cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
if cursors:
# Get nodes by name
nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}

# Iterate over each node till its cursor is 0
kwargs.pop("target_nodes", None)
while cursors:
for name, cursor in cursors.items():
cur, data = self.scan(
cursor=cursor,
match=match,
count=count,
_type=_type,
target_nodes=nodes[name],
**kwargs,
)
yield from data
cursors[name] = cur[name]

cursors = {
name: cursor for name, cursor in cursors.items() if cursor != 0
}


class RedisClusterCommands(
ClusterMultiKeyCommands,
Expand Down
24 changes: 17 additions & 7 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,13 +1810,23 @@ def test_cluster_scan_type(self, r):

@skip_if_server_version_lt("2.8.0")
def test_cluster_scan_iter(self, r):
r.set("a", 1)
r.set("b", 2)
r.set("c", 3)
keys = list(r.scan_iter(target_nodes="primaries"))
assert set(keys) == {b"a", b"b", b"c"}
keys = list(r.scan_iter(match="a", target_nodes="primaries"))
assert set(keys) == {b"a"}
keys_all = []
keys_1 = []
for i in range(100):
s = str(i)
r.set(s, 1)
keys_all.append(s.encode("utf-8"))
if s.startswith("1"):
keys_1.append(s.encode("utf-8"))
keys_all.sort()
keys_1.sort()

for target_nodes in ["primaries", "replicas"]:
keys = r.scan_iter(target_nodes=target_nodes)
assert sorted(keys) == keys_all

keys = r.scan_iter(match="1*", target_nodes=target_nodes)
assert sorted(keys) == keys_1

def test_cluster_randomkey(self, r):
node = r.get_node_from_key("{foo}")
Expand Down

0 comments on commit 6e7f052

Please sign in to comment.