Skip to content
6 changes: 4 additions & 2 deletions .github/workflows/install_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ cd ${TESTDIR}
# install, run tests
pip install ${PKG}
# Redis tests
pytest -m 'not onlycluster'
pytest -m 'not onlycluster' --ignore=tests/test_scenario
# RedisCluster tests
CLUSTER_URL="redis://localhost:16379/0"
CLUSTER_SSL_URL="rediss://localhost:27379/0"
pytest -m 'not onlynoncluster and not redismod and not ssl' \
--redis-url="${CLUSTER_URL}" --redis-ssl-url="${CLUSTER_SSL_URL}"
--ignore=tests/test_scenario \
--redis-url="${CLUSTER_URL}" \
--redis-ssl-url="${CLUSTER_SSL_URL}"
2 changes: 1 addition & 1 deletion redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def parse_moving_msg(response):
# Expected message format is: MOVING <seq_number> <time> <endpoint>
id = response[1]
ttl = response[2]
if response[3] in [b"null", "null"]:
if response[3] is None:
host, port = None, None
else:
value = response[3]
Expand Down
44 changes: 26 additions & 18 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1720,7 +1720,7 @@ def __init__(
self._cache_factory = cache_factory

if connection_kwargs.get("cache_config") or connection_kwargs.get("cache"):
if connection_kwargs.get("protocol") not in [3, "3"]:
if self.connection_kwargs.get("protocol") not in [3, "3"]:
raise RedisError("Client caching is only supported with RESP version 3")

cache = self.connection_kwargs.get("cache")
Expand All @@ -1741,31 +1741,21 @@ def __init__(
connection_kwargs.pop("cache", None)
connection_kwargs.pop("cache_config", None)

if connection_kwargs.get(
if self.connection_kwargs.get(
"maintenance_events_pool_handler"
) or connection_kwargs.get("maintenance_events_config"):
if connection_kwargs.get("protocol") not in [3, "3"]:
) or self.connection_kwargs.get("maintenance_events_config"):
if self.connection_kwargs.get("protocol") not in [3, "3"]:
raise RedisError(
"Push handlers on connection are only supported with RESP version 3"
)
config = connection_kwargs.get("maintenance_events_config", None) or (
connection_kwargs.get("maintenance_events_pool_handler").config
if connection_kwargs.get("maintenance_events_pool_handler")
config = self.connection_kwargs.get("maintenance_events_config", None) or (
self.connection_kwargs.get("maintenance_events_pool_handler").config
if self.connection_kwargs.get("maintenance_events_pool_handler")
else None
)

if config and config.enabled:
connection_kwargs.update(
{
"orig_host_address": connection_kwargs.get("host"),
"orig_socket_timeout": connection_kwargs.get(
"socket_timeout", None
),
"orig_socket_connect_timeout": connection_kwargs.get(
"socket_connect_timeout", None
),
}
)
self._update_connection_kwargs_for_maintenance_events()

self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
if self._event_dispatcher is None:
Expand Down Expand Up @@ -1821,6 +1811,7 @@ def set_maintenance_events_pool_handler(
"maintenance_events_config": maintenance_events_pool_handler.config,
}
)
self._update_connection_kwargs_for_maintenance_events()

self._update_maintenance_events_configs_for_connections(
maintenance_events_pool_handler
Expand All @@ -1838,6 +1829,23 @@ def _update_maintenance_events_configs_for_connections(
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
conn.maintenance_events_config = maintenance_events_pool_handler.config

def _update_connection_kwargs_for_maintenance_events(self):
"""Store original connection parameters for maintenance events."""
if self.connection_kwargs.get("orig_host_address", None) is None:
# If orig_host_address is None it means we haven't
# configured the original values yet
self.connection_kwargs.update(
{
"orig_host_address": self.connection_kwargs.get("host"),
"orig_socket_timeout": self.connection_kwargs.get(
"socket_timeout", None
),
"orig_socket_connect_timeout": self.connection_kwargs.get(
"socket_connect_timeout", None
),
}
)

def reset(self) -> None:
self._created_connections = 0
self._available_connections = []
Expand Down
2 changes: 1 addition & 1 deletion redis/maintenance_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def __init__(
self,
enabled: bool = True,
proactive_reconnect: bool = True,
relax_timeout: Optional[Number] = 20,
relax_timeout: Optional[Number] = 10,
endpoint_type: Optional[EndpointType] = None,
):
"""
Expand Down
8 changes: 4 additions & 4 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def standalone_tests(

if uvloop:
run(
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml"
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml"
)
else:
run(
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml"
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml"
)


Expand All @@ -74,11 +74,11 @@ def cluster_tests(c, uvloop=False, protocol=2, profile=False):
cluster_tls_url = "rediss://localhost:27379/0"
if uvloop:
run(
f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop"
f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop"
)
else:
run(
f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml"
f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml"
)


Expand Down
2 changes: 1 addition & 1 deletion tests/test_maintenance_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def test_init_defaults(self):
config = MaintenanceEventsConfig()
assert config.enabled is True
assert config.proactive_reconnect is True
assert config.relax_timeout == 20
assert config.relax_timeout == 10

def test_init_custom_values(self):
"""Test MaintenanceEventsConfig initialization with custom values."""
Expand Down
4 changes: 1 addition & 3 deletions tests/test_maintenance_events_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,7 @@ def send(self, data):
# MOVING push message before SET key_receive_moving_none_X response
# Format: >4\r\n$6\r\nMOVING\r\n:1\r\n:1\r\n+null\r\n (4 elements: MOVING, id, ttl, null)
# Note: Using + instead of $ to send as simple string instead of bulk string
moving_push = (
f">4\r\n$6\r\nMOVING\r\n:1\r\n:{MOVING_TIMEOUT}\r\n+null\r\n"
)
moving_push = f">4\r\n$6\r\nMOVING\r\n:1\r\n:{MOVING_TIMEOUT}\r\n_\r\n"
response = moving_push.encode() + response
elif b"key_receive_moving_" in data:
# MOVING push message before SET key_receive_moving_X response
Expand Down
120 changes: 120 additions & 0 deletions tests/test_scenario/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import logging
import os
from typing import Optional
from urllib.parse import urlparse
import pytest

from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
from redis.client import Redis
from redis.maintenance_events import EndpointType, MaintenanceEventsConfig
from redis.retry import Retry
from tests.test_scenario.fault_injector_client import FaultInjectorClient

RELAX_TIMEOUT = 30
CLIENT_TIMEOUT = 5

DEFAULT_ENDPOINT_NAME = "m-standard"


@pytest.fixture()
def endpoint_name(request):
return request.config.getoption("--endpoint-name") or os.getenv(
"REDIS_ENDPOINT_NAME", DEFAULT_ENDPOINT_NAME
)


@pytest.fixture()
def endpoints_config(endpoint_name: str):
endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None)

if not (endpoints_config and os.path.exists(endpoints_config)):
raise FileNotFoundError(f"Endpoints config file not found: {endpoints_config}")

try:
with open(endpoints_config, "r") as f:
data = json.load(f)
db = data[endpoint_name]
return db
except Exception as e:
raise ValueError(
f"Failed to load endpoints config file: {endpoints_config}"
) from e


@pytest.fixture()
def fault_injector_client():
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
return FaultInjectorClient(url)


@pytest.fixture()
def client_maint_events(endpoints_config):
return _get_client_maint_events(endpoints_config)


def _get_client_maint_events(
endpoints_config,
enable_maintenance_events: bool = True,
endpoint_type: Optional[EndpointType] = None,
enable_relax_timeout: bool = True,
enable_proactive_reconnect: bool = True,
disable_retries: bool = False,
socket_timeout: Optional[float] = None,
):
"""Create Redis client with maintenance events enabled."""

# Get credentials from the configuration
username = endpoints_config.get("username")
password = endpoints_config.get("password")

# Parse host and port from endpoints URL
endpoints = endpoints_config.get("endpoints", [])
if not endpoints:
raise ValueError("No endpoints found in configuration")

parsed = urlparse(endpoints[0])
host = parsed.hostname
port = parsed.port

tls_enabled = True if parsed.scheme == "rediss" else False

if not host:
raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}")

logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}")

# Configure maintenance events
maintenance_config = MaintenanceEventsConfig(
enabled=enable_maintenance_events,
proactive_reconnect=enable_proactive_reconnect,
relax_timeout=RELAX_TIMEOUT if enable_relax_timeout else -1,
endpoint_type=endpoint_type,
)

# Create Redis client with maintenance events config
# This will automatically create the MaintenanceEventPoolHandler
if disable_retries:
retry = Retry(NoBackoff(), 0)
else:
retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3)

client = Redis(
host=host,
port=port,
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
username=username,
password=password,
ssl=tls_enabled,
ssl_cert_reqs="none",
ssl_check_hostname=False,
protocol=3, # RESP3 required for push notifications
maintenance_events_config=maintenance_config,
retry=retry,
)
logging.info("Redis client created with maintenance events enabled")
logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}")
maintenance_handler_exists = client.maintenance_events_pool_handler is not None
logging.info(f"Maintenance events pool handler: {maintenance_handler_exists}")

return client
44 changes: 44 additions & 0 deletions tests/test_scenario/fault_injector_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import json
import logging
import time
import urllib.request
from typing import Dict, Any, Optional, Union
from enum import Enum

import pytest


class TaskStatuses:
"""Class to hold completed statuses constants."""

FAILED = "failed"
FINISHED = "finished"
SUCCESS = "success"
RUNNING = "running"

COMPLETED_STATUSES = [FAILED, FINISHED, SUCCESS]


class ActionType(str, Enum):
DMC_RESTART = "dmc_restart"
Expand Down Expand Up @@ -103,3 +118,32 @@ def execute_rladmin_command(
error_body = json.loads(e.read().decode("utf-8"))
raise ValueError(f"Validation Error: {error_body}")
raise

def get_operation_result(
self,
action_id: str,
timeout: int = 60,
) -> Dict[str, Any]:
"""Get the result of a specific action"""
start_time = time.time()
check_interval = 3
while time.time() - start_time < timeout:
try:
status_result = self.get_action_status(action_id)
operation_status = status_result.get("status", "unknown")

if operation_status in TaskStatuses.COMPLETED_STATUSES:
logging.debug(
f"Operation {action_id} completed with status: "
f"{operation_status}"
)
if operation_status != TaskStatuses.SUCCESS:
pytest.fail(f"Operation {action_id} failed: {status_result}")
return status_result

time.sleep(check_interval)
except Exception as e:
logging.warning(f"Error checking operation status: {e}")
time.sleep(check_interval)
else:
raise TimeoutError(f"Timeout waiting for operation {action_id}")
Loading