Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix scan iter command issued to different replicas #1

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b2179f6
Close Unix sockets if the connection attempt fails (#3315)
kurtmckee Jul 17, 2024
b206a0f
Suppress `Graph` deprecation warning in test suite (#3316)
kurtmckee Jul 17, 2024
9607608
Close SSL sockets when connections/validations fail (#3318)
kurtmckee Jul 17, 2024
7e2b4fb
Upgrade Flake8 (#3323)
gerzse Jul 18, 2024
2ffcac3
Resolve some docs warnings (#3322)
kurtmckee Jul 18, 2024
9fb0376
fix scan iter command issued to different replicas
agnesnatasya Apr 26, 2024
48034de
add tests
agnesnatasya Apr 26, 2024
824eada
reorder
agnesnatasya Apr 27, 2024
8f2ec5f
remove ignore
agnesnatasya Apr 27, 2024
ca9bd63
lint and format
agnesnatasya Apr 27, 2024
076e592
better inline
agnesnatasya Apr 29, 2024
224f253
backward compatible typing
agnesnatasya Apr 29, 2024
8081e03
test inline docs
agnesnatasya May 10, 2024
ed2b539
add tests for all scan iter family
agnesnatasya May 10, 2024
5d7800e
lint
agnesnatasya May 10, 2024
b964b9b
implement in sync client
agnesnatasya Jul 9, 2024
e5c74ac
more features for ConnectionsINdexer
agnesnatasya Jul 14, 2024
83dc599
add _same_addres methods for sentinels
agnesnatasya Jul 14, 2024
546b443
fix connect_to args
agnesnatasya Jul 14, 2024
803b291
fix tests
agnesnatasya Jul 14, 2024
c770804
add self
agnesnatasya Jul 14, 2024
6add0ef
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
7d6c2a4
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
9b52903
fix typo
agnesnatasya Jul 14, 2024
e6375b2
fix
agnesnatasya Jul 14, 2024
d7f8e90
fix connect_to_address
agnesnatasya Jul 14, 2024
d1c25b9
cleanup in sync client
agnesnatasya Jul 16, 2024
b8aeb71
rename kwargs to no underscore for consistency
agnesnatasya Jul 16, 2024
ca7d0ff
add cleanup tests for pipeline
agnesnatasya Jul 16, 2024
b21f1ea
remove test for pipeline
agnesnatasya Jul 16, 2024
24b3072
lints
agnesnatasya Jul 16, 2024
1d29914
reformat
agnesnatasya Jul 16, 2024
a4e20ac
def cleanup in base class
agnesnatasya Jul 16, 2024
12e6495
fix some tests
agnesnatasya Jul 16, 2024
a7ffda8
rename iter_req_id properly
agnesnatasya Jul 16, 2024
8a1bc06
fix tests
agnesnatasya Jul 16, 2024
f504741
set fix address as a property of SentinelManagedConnection
agnesnatasya Jul 19, 2024
c75db8d
lint
agnesnatasya Jul 20, 2024
0f5e079
make mock class have same behavior as actual class
agnesnatasya Jul 20, 2024
ef0d969
define _connect_to_sentinel in async server
agnesnatasya Jul 20, 2024
bb69c85
mock can_read_destructive for parser
agnesnatasya Jul 20, 2024
a4601a7
skip test sentinel managed connection if hirediswq
agnesnatasya Jul 20, 2024
88501a0
undo ensure_connection deduplication in BlockingConnectionPool
agnesnatasya Jul 20, 2024
c75262b
import HIREDIS
agnesnatasya Jul 20, 2024
cdbe957
polymorphism for reset available connections instead
agnesnatasya Jul 20, 2024
ac20752
merge
agnesnatasya Jul 20, 2024
9405695
lint
agnesnatasya Jul 20, 2024
a1ece92
fix inline comments + rename
agnesnatasya Jul 20, 2024
eede649
lint
agnesnatasya Jul 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ unicode
url
virtualenv
www
yaml
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
* Make `ClusterCommandsProtocol` an actual Protocol
* Add `sum` to DUPLICATE_POLICY documentation of `TS.CREATE`, `TS.ADD` and `TS.ALTER`
* Prevent async ClusterPipeline instances from becoming "false-y" in case of empty command stack (#3061)
* Close Unix sockets if the connection attempt fails. This prevents `ResourceWarning`s. (#3314)
* Close SSL sockets if the connection attempt fails, or if validations fail. (#3317)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
4 changes: 2 additions & 2 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
black==24.3.0
cachetools
click==8.0.4
flake8-isort==6.0.0
flake8==5.0.4
flake8-isort
flake8
flynt~=0.69.0
invoke==2.2.0
mock
Expand Down
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
# further. For a list of options available for each theme, see the
# documentation.
html_theme_options = {
"display_version": True,
"footer_icons": [
{
"name": "GitHub",
Expand Down
6 changes: 3 additions & 3 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ClusterNode
Async Client
************

See complete example: `here <examples/asyncio_examples.html>`_
See complete example: `here <examples/asyncio_examples.html>`__

This client is used for communicating with Redis, asynchronously.

Expand Down Expand Up @@ -88,7 +88,7 @@ ClusterPipeline (Async)
Connection
**********

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

Connection
==========
Expand All @@ -104,7 +104,7 @@ Connection (Async)
Connection Pools
****************

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

ConnectionPool
==============
Expand Down
8 changes: 4 additions & 4 deletions docs/opentelemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Integrating OpenTelemetry
What is OpenTelemetry?
----------------------

`OpenTelemetry <https://opentelemetry.io>`_ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.
`OpenTelemetry <https://opentelemetry.io>`__ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.

OpenTelemetry allows developers to collect and export telemetry data in a vendor agnostic way. With OpenTelemetry, you can instrument your application once and then add or change vendors without changing the instrumentation, for example, here is a list of `popular DataDog competitors <https://uptrace.dev/get/compare/datadog-competitors.html>`_ that support OpenTelemetry.

Expand Down Expand Up @@ -61,7 +61,7 @@ Once the code is patched, you can use redis-py as usually:
OpenTelemetry API
-----------------

`OpenTelemetry <https://uptrace.dev/opentelemetry/>`_ API is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.
`OpenTelemetry API <https://uptrace.dev/opentelemetry/>`__ is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.

You can use OpenTelemetry API to measure important operations:

Expand Down Expand Up @@ -125,7 +125,7 @@ Alerting and notifications

Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/opentelemetry/metrics.html>`_ using alerting rules. For example, the following monitor uses the group by node expression to create an alert whenever an individual Redis shard is down:

.. code-block:: python
.. code-block:: yaml

monitors:
- name: Redis shard is down
Expand All @@ -142,7 +142,7 @@ Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/o

You can also create queries with more complex expressions. For example, the following rule creates an alert when the keyspace hit rate is lower than 75%:

.. code-block:: python
.. code-block:: yaml

monitors:
- name: Redis read hit rate < 75%
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ markers =
experimental: run only experimental tests
asyncio_mode = auto
timeout = 30
filterwarnings =
always
ignore:RedisGraph support is deprecated as of Redis Stack 7.2:DeprecationWarning
2 changes: 1 addition & 1 deletion redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def parse_geosearch_generic(response, **options):
except KeyError: # it means the command was sent via execute_command
return response

if type(response) != list:
if not isinstance(response, list):
response_list = [response]
else:
response_list = response
Expand Down
4 changes: 4 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ async def execute_command(self, *args, **options):
finally:
if not self.connection:
await pool.release(conn)
# Do additional cleanup if this is part of a SCAN ITER family command.
# It's possible that this is just a pure SCAN family command though.
if "SCAN" in command_name.upper():
pool.cleanup(iter_req_id=options.get("iter_req_id", None))

async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
Expand Down
18 changes: 16 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,14 @@ class ConnectionPool:
``connection_class``.
"""

def cleanup(self, **options):
"""
Additional cleanup operations that the connection pool might need to do.
See SentinelManagedConnection for an example cleanup operation that
might need to be done.
"""
pass

@classmethod
def from_url(cls: Type[_CP], url: str, **kwargs) -> _CP:
"""
Expand Down Expand Up @@ -1118,7 +1126,7 @@ def __init__(
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections

self._available_connections: List[AbstractConnection] = []
self._available_connections = self.reset_available_connections()
self._in_use_connections: Set[AbstractConnection] = set()
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)

Expand All @@ -1129,9 +1137,12 @@ def __repr__(self):
)

def reset(self):
self._available_connections = []
self._available_connections = self.reset_available_connections()
self._in_use_connections = weakref.WeakSet()

def reset_available_connections(self):
return []

def can_get_connection(self) -> bool:
"""Return True if a connection can be retrieved from the pool."""
return (
Expand Down Expand Up @@ -1324,3 +1335,6 @@ async def release(self, connection: AbstractConnection):
async with self._condition:
await super().release(connection)
self._condition.notify()

def cleanup(self, **options):
pass
111 changes: 110 additions & 1 deletion redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import asyncio
import random
import weakref
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
from typing import (
Any,
AsyncIterator,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
Type,
)

from redis.asyncio.client import Redis
from redis.asyncio.connection import (
Expand All @@ -12,6 +21,7 @@
)
from redis.commands import AsyncSentinelCommands
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.sentinel import ConnectionsIndexer
from redis.utils import str_if_bytes


Expand All @@ -26,6 +36,10 @@ class SlaveNotFoundError(ConnectionError):
class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
self.connection_pool = kwargs.pop("connection_pool")
# To be set to True if we want to prevent
# the connection to connect to the most relevant sentinel
# in the pool and just connect to the current host and port
self._is_address_set = False
super().__init__(**kwargs)

def __repr__(self):
Expand All @@ -39,6 +53,14 @@ def __repr__(self):
s += host_info
return s + ")>"

def set_address(self, address):
"""
By setting the address, the connection will just connect
to the current host and port the next time connect is called.
"""
self.host, self.port = address
self._is_address_set = True

async def connect_to(self, address):
self.host, self.port = address
await super().connect()
Expand All @@ -50,6 +72,14 @@ async def connect_to(self, address):
async def _connect_retry(self):
if self._reader:
return # already connected
# If address is fixed, it means that the connection
# just connect to the current host and port
if self._is_address_set:
await self.connect_to((self.host, self.port))
return
await self._connect_to_sentinel()

async def _connect_to_sentinel(self):
if self.connection_pool.is_master:
await self.connect_to(await self.connection_pool.get_master_address())
else:
Expand Down Expand Up @@ -122,6 +152,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None
self._iter_req_id_to_replica_address = {}

def __repr__(self):
return (
Expand All @@ -134,6 +165,9 @@ def reset(self):
self.master_address = None
self.slave_rr_counter = None

def reset_available_connections(self):
return ConnectionsIndexer()

def owns_connection(self, connection: Connection):
check = not self.is_master or (
self.is_master and self.master_address == (connection.host, connection.port)
Expand Down Expand Up @@ -167,6 +201,81 @@ async def rotate_slaves(self) -> AsyncIterator:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")

def cleanup(self, **options):
"""
Remove the SCAN ITER family command's request id from the dictionary
"""
self._iter_req_id_to_replica_address.pop(options.get("iter_req_id", None), None)

async def get_connection(
self, command_name: str, *keys: Any, **options: Any
) -> SentinelManagedConnection:
"""
Get a connection from the pool.
'xxxscan_iter' ('scan_iter', 'hscan_iter', 'sscan_iter', 'zscan_iter')
commands needs to be handled specially.
If the client is created using a connection pool, in replica mode,
all 'scan' command-equivalent of the 'xxx_scan_iter' commands needs
to be issued to the same Redis replica.

The way each server positions each key is different with one another,
and the cursor acts as the offset of the scan.
Hence, all scans coming from a single 'xxx_scan_iter_channel' command
should go to the same replica.
"""
# If not an iter command or in master mode, call superclass' implementation
if not (iter_req_id := options.get("iter_req_id", None)) or self.is_master:
return await super().get_connection(command_name, *keys, **options)

# Check if this iter request has already been directed to a particular server
(
server_host,
server_port,
) = self._iter_req_id_to_replica_address.get(iter_req_id, (None, None))
connection = None
# If this is the first scan request of the iter command,
# get a connection from the pool
if server_host is None or server_port is None:
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
# If this is not the first scan request of the iter command
else:
# Get the connection that has the same host and port
connection = self._available_connections.get_connection(
host=server_host, port=server_port
)
# If not, make a new dummy connection object, and set its host and
# port to the one that we want later in the call to ``set_address``
if not connection:
connection = self.make_connection()
assert connection
self._in_use_connections.add(connection)
try:
# Ensure this connection is connected to Redis
# If this is the first scan request, it will
# call rotate_slaves and connect to a random replica
if server_port is None or server_port is None:
await connection.connect()
# If this is not the first scan request,
# connect to the previous replica.
# This will connect to the host and port of the replica
else:
connection.set_address((server_host, server_port))
await self.ensure_connection(connection)
except BaseException:
# Release the connection back to the pool so that we don't
# leak it
await self.release(connection)
raise
# Store the connection to the dictionary
self._iter_req_id_to_replica_address[iter_req_id] = (
connection.host,
connection.port,
)
return connection


class Sentinel(AsyncSentinelCommands):
"""
Expand Down
4 changes: 4 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ def execute_command(self, *args, **options):
finally:
if not self.connection:
pool.release(conn)
# Do additional cleanup if this is part of a SCAN ITER family command.
# It's possible that this is just a pure SCAN family command though.
if "SCAN" in command_name.upper():
pool.cleanup(iter_req_id=options.get("iter_req_id", None))

def parse_response(self, connection, command_name, **options):
"""Parses a response from the Redis server"""
Expand Down
Loading
Loading