Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan iter bug: dev branch #2

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4397ec4
fix scan iter command issued to different replicas
agnesnatasya Apr 26, 2024
3d5e674
add tests
agnesnatasya Apr 26, 2024
b9d2338
reorder
agnesnatasya Apr 27, 2024
3c1b372
remove ignore
agnesnatasya Apr 27, 2024
cadd357
lint and format
agnesnatasya Apr 27, 2024
e2aa893
better inline
agnesnatasya Apr 29, 2024
1758a9b
backward compatible typing
agnesnatasya Apr 29, 2024
fac2ecd
test inline docs
agnesnatasya May 10, 2024
6751a04
add tests for all scan iter family
agnesnatasya May 10, 2024
f667e34
lint
agnesnatasya May 10, 2024
847ea71
implement in sync client
agnesnatasya Jul 9, 2024
93a2b91
more features for ConnectionsINdexer
agnesnatasya Jul 14, 2024
2f3c887
add _same_addres methods for sentinels
agnesnatasya Jul 14, 2024
992d192
fix connect_to args
agnesnatasya Jul 14, 2024
8d6ca1f
fix tests
agnesnatasya Jul 14, 2024
b2d9d93
add self
agnesnatasya Jul 14, 2024
5838500
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
e101bdf
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
96222dd
fix typo
agnesnatasya Jul 14, 2024
3482723
fix
agnesnatasya Jul 14, 2024
ae1b09a
fix connect_to_address
agnesnatasya Jul 14, 2024
6886f71
cleanup in sync client
agnesnatasya Jul 16, 2024
52a1d58
rename kwargs to no underscore for consistency
agnesnatasya Jul 16, 2024
b37fa0c
add cleanup tests for pipeline
agnesnatasya Jul 16, 2024
2f9964e
remove test for pipeline
agnesnatasya Jul 16, 2024
f436f60
lints
agnesnatasya Jul 16, 2024
a2ed1ac
reformat
agnesnatasya Jul 16, 2024
a9f2160
def cleanup in base class
agnesnatasya Jul 16, 2024
6940526
fix some tests
agnesnatasya Jul 16, 2024
e8c7a8b
rename iter_req_id properly
agnesnatasya Jul 16, 2024
dda3b61
fix tests
agnesnatasya Jul 16, 2024
e98c770
set fix address as a property of SentinelManagedConnection
agnesnatasya Jul 19, 2024
e32df58
lint
agnesnatasya Jul 20, 2024
08d3428
make mock class have same behavior as actual class
agnesnatasya Jul 20, 2024
d1db9f6
define _connect_to_sentinel in async server
agnesnatasya Jul 20, 2024
4c59821
mock can_read_destructive for parser
agnesnatasya Jul 20, 2024
25777cf
skip test sentinel managed connection if hirediswq
agnesnatasya Jul 20, 2024
0ee1b85
undo ensure_connection deduplication in BlockingConnectionPool
agnesnatasya Jul 20, 2024
5edff2b
import HIREDIS
agnesnatasya Jul 20, 2024
8d9c735
polymorphism for reset available connections instead
agnesnatasya Jul 20, 2024
c6c7bf7
merge
agnesnatasya Jul 20, 2024
868b499
lint
agnesnatasya Jul 20, 2024
5e249fd
fix inline comments + rename
agnesnatasya Jul 20, 2024
8643185
lint
agnesnatasya Jul 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lints
  • Loading branch information
agnesnatasya committed Jul 16, 2024
commit f436f60fcd5ec7ff795e6bd946909b139c57f25f
21 changes: 13 additions & 8 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ async def _connect_retry(self, same_address: bool = False):
if self._reader:
return # already connected
# If same_server is True, it means that the connection
# is not rotating to the next slave (if the connection pool is not master)
# is not rotating to the next slave (if the connection pool is in replica mode)
if same_address:
self.connect_to(self.host, self.port)
return
# If same_server is False, connnect to master in master mode
# If same_server is False, connnect to master in master mode
# and rotate to the next slave in slave mode
if self.connection_pool.is_master:
await self.connect_to(await self.connection_pool.get_master_address())
Expand All @@ -84,7 +84,7 @@ async def connect(self):

async def connect_to_same_address(self):
"""
Similar to connect, but instead of rotating to the next slave (if not in master mode),
Similar to connect, but instead of rotating to the next slave (in replica mode),
it just connects to the same address of the connection object.
"""
return await self.retry.call_with_retry(
Expand All @@ -94,7 +94,7 @@ async def connect_to_same_address(self):

async def connect_to_address(self, address):
"""
Similar to connect, but instead of rotating to the next slave (if not in master mode),
Similar to connect, but instead of rotating to the next slave (in replica mode),
it just connects to the same address of the connection object.
"""
self.host, self.port = address
Expand Down Expand Up @@ -202,13 +202,15 @@ async def rotate_slaves(self) -> AsyncIterator:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")

async def ensure_connection_connected_to_address(self, connection: SentinelManagedConnection):
async def ensure_connection_connected_to_address(
self, connection: SentinelManagedConnection
):
"""
Ensure the connection is already connected to the server that this connection
object wants to connect to

Similar to self.ensure_connection, but calling connection.connect()
in SentinelManagedConnection (replica mode) will cause the
in SentinelManagedConnection (replica mode) will cause the
connection object to connect to the next replica in rotation,
and we don't wnat behavior. Look at get_connection inline docs for details.

Expand All @@ -217,7 +219,10 @@ async def ensure_connection_connected_to_address(self, connection: SentinelManag
"""
await connection.connect_to_same_address()
try:
if await connection.can_read_destructive() and connection.client_cache is None:
if (
await connection.can_read_destructive()
and connection.client_cache is None
):
raise ConnectionError("Connection has data")
except (ConnectionError, OSError):
await connection.disconnect()
Expand Down Expand Up @@ -273,7 +278,7 @@ async def get_connection(
host=server_host, port=server_port
)
# If not, make a new dummy connection object, and set its host and
# port to the one that we want later in the call to ``connect_to_same_address``
# port to the one that we want later in the call to ``connect_to_address``
if not connection:
connection = self.make_connection()
assert connection
Expand Down
1 change: 0 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,6 @@ def reset(self) -> None:
self.patterns = {}
self.pending_unsubscribe_patterns = set()
self.subscribed_event.clear()
self.connection_pool.cleanup(iter_req_id=options.get("_iter_req_id", None))

def close(self) -> None:
self.reset()
Expand Down
4 changes: 2 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,10 +737,10 @@ def _host_error(self):

class ConnectionsIndexer(Iterable):
"""
Data structure that simulates a list of available connections.
Data structure that simulates a list of available connections.
Instead of list, we keep 2 additional DS to support O(1) operations
on all of the class' methods.
The first DS is indexed on the connection object's ID.
The first DS is indexed on the connection object's ID.
The second DS is indexed on the address (ip and port) of the connection.
"""

Expand Down
24 changes: 16 additions & 8 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _connect_retry(self, same_address: bool = False):
if same_address:
self.connect_to((self.host, self.port))
return
# If same_server is False, connnect to master in master mode
# If same_server is False, connnect to master in master mode
# and rotate to the next slave in slave mode
if self.connection_pool.is_master:
self.connect_to(self.connection_pool.get_master_address())
Expand All @@ -62,11 +62,14 @@ def _connect_retry(self, same_address: bool = False):
raise SlaveNotFoundError # Never be here

def connect(self):
return self.retry.call_with_retry(lambda: self._connect_retry(), lambda error: None)
return self.retry.call_with_retry(
lambda: self._connect_retry(),
lambda error: None
)

def connect_to_same_address(self):
"""
Similar to connect, but instead of rotating to the next slave (if not in master mode),
Similar to connect, but instead of rotating to the next slave (in replica mode),
it just connects to the same address of the connection object.
"""
return self.retry.call_with_retry(
Expand All @@ -76,14 +79,17 @@ def connect_to_same_address(self):

def connect_to_address(self, address):
"""
Similar to connect, but instead of rotating to the next slave (if not in master mode),
Similar to connect, but instead of rotating to the next slave (in replica mode),
it just connects to the address supplied.
"""
self.host, self.port = address
return self.connect_to_same_address()

def can_read_same_address(self, timeout=0):
"""Similar to can_read_same_address, but calls connect_to_same_address instead of connect"""
"""
Similar to can_read_same_address, but calls
connect_to_same_address instead of connect
"""
sock = self._sock
if not sock:
self.connect_to_same_address()
Expand Down Expand Up @@ -234,13 +240,15 @@ def rotate_slaves(self):
"Round-robin slave balancer"
return self.proxy.rotate_slaves()

def ensure_connection_connected_to_address(self, connection: SentinelManagedConnection):
def ensure_connection_connected_to_address(
self, connection: SentinelManagedConnection
):
"""
Ensure the connection is already connected to the server that this connection
object wants to connect to
object wants to connect to.

Similar to self.ensure_connection, but calling connection.connect()
in SentinelManagedConnection (replica mode) will cause the
in SentinelManagedConnection (replica mode) will cause the
connection object to connect to the next replica in rotation,
and we don't wnat behavior. Look at get_connection inline docs for details.

Expand Down
11 changes: 3 additions & 8 deletions tests/test_sentinel_managed_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from unittest import mock

import pytest
from redis.sentinel import (
Sentinel,
SentinelConnectionPool,
SentinelManagedConnection,
)
from redis import Redis
from redis.sentinel import Sentinel, SentinelConnectionPool, SentinelManagedConnection


class SentinelManagedConnectionMock(SentinelManagedConnection):
def connect_to_same_address(self) -> None:
Expand Down Expand Up @@ -211,8 +209,6 @@ def test_scan_iter_in_redis_cleans_up(
connection_pool_replica_mock: SentinelConnectionPool,
):
"""Test that connection pool is correctly cleaned up"""
from redis import Redis

r = Redis(connection_pool=connection_pool_replica_mock)
# Patch the actual sending and parsing response from the Connection object
# but still let the connection pool does all the necessary work
Expand All @@ -221,4 +217,3 @@ def test_scan_iter_in_redis_cleans_up(
# Test that the iter_req_id for the scan command is cleared at the
# end of the SCAN ITER command
assert not connection_pool_replica_mock._iter_req_id_to_replica_address