Skip to content

Commit

Permalink
feat: support redis sentinel mode (#7756)
Browse files Browse the repository at this point in the history
  • Loading branch information
erigo authored Sep 8, 2024
1 parent 2d7954c commit d542b15
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 21 deletions.
17 changes: 16 additions & 1 deletion api/configs/middleware/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional
from urllib.parse import quote_plus

from pydantic import Field, NonNegativeInt, PositiveInt, computed_field
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt, computed_field
from pydantic_settings import BaseSettings

from configs.middleware.cache.redis_config import RedisConfig
Expand Down Expand Up @@ -158,6 +158,21 @@ class CeleryConfig(DatabaseConfig):
default=None,
)

CELERY_USE_SENTINEL: Optional[bool] = Field(
description="Whether to use Redis Sentinel mode",
default=False,
)

CELERY_SENTINEL_MASTER_NAME: Optional[str] = Field(
description="Redis Sentinel master name",
default=None,
)

CELERY_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field(
description="Redis Sentinel socket timeout",
default=0.1,
)

@computed_field
@property
def CELERY_RESULT_BACKEND(self) -> str | None:
Expand Down
32 changes: 31 additions & 1 deletion api/configs/middleware/cache/redis_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from pydantic import Field, NonNegativeInt, PositiveInt
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt
from pydantic_settings import BaseSettings


Expand Down Expand Up @@ -38,3 +38,33 @@ class RedisConfig(BaseSettings):
description="whether to use SSL for Redis connection",
default=False,
)

REDIS_USE_SENTINEL: Optional[bool] = Field(
description="Whether to use Redis Sentinel mode",
default=False,
)

REDIS_SENTINELS: Optional[str] = Field(
description="Redis Sentinel nodes",
default=None,
)

REDIS_SENTINEL_SERVICE_NAME: Optional[str] = Field(
description="Redis Sentinel service name",
default=None,
)

REDIS_SENTINEL_USERNAME: Optional[str] = Field(
description="Redis Sentinel username",
default=None,
)

REDIS_SENTINEL_PASSWORD: Optional[str] = Field(
description="Redis Sentinel password",
default=None,
)

REDIS_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field(
description="Redis Sentinel socket timeout",
default=0.1,
)
21 changes: 16 additions & 5 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

broker_transport_options = {}

if app.config.get("CELERY_USE_SENTINEL"):
broker_transport_options = {
"master_name": app.config.get("CELERY_SENTINEL_MASTER_NAME"),
"sentinel_kwargs": {
"socket_timeout": app.config.get("CELERY_SENTINEL_SOCKET_TIMEOUT", 0.1),
},
}

celery_app = Celery(
app.name,
task_cls=FlaskTask,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_BACKEND"],
broker=app.config.get("CELERY_BROKER_URL"),
backend=app.config.get("CELERY_BACKEND"),
task_ignore_result=True,
)

Expand All @@ -27,11 +37,12 @@ def __call__(self, *args: object, **kwargs: object) -> object:
}

celery_app.conf.update(
result_backend=app.config["CELERY_RESULT_BACKEND"],
result_backend=app.config.get("CELERY_RESULT_BACKEND"),
broker_transport_options=broker_transport_options,
broker_connection_retry_on_startup=True,
)

if app.config["BROKER_USE_SSL"]:
if app.config.get("BROKER_USE_SSL"):
celery_app.conf.update(
broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
)
Expand All @@ -43,7 +54,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"schedule.clean_embedding_cache_task",
"schedule.clean_unused_datasets_task",
]
day = app.config["CELERY_BEAT_SCHEDULER_TIME"]
day = app.config.get("CELERY_BEAT_SCHEDULER_TIME")
beat_schedule = {
"clean_embedding_cache_task": {
"task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
Expand Down
85 changes: 71 additions & 14 deletions api/extensions/ext_redis.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,83 @@
import redis
from redis.connection import Connection, SSLConnection
from redis.sentinel import Sentinel

redis_client = redis.Redis()

class RedisClientWrapper(redis.Redis):
"""
A wrapper class for the Redis client that addresses the issue where the global
`redis_client` variable cannot be updated when a new Redis instance is returned
by Sentinel.
This class allows for deferred initialization of the Redis client, enabling the
client to be re-initialized with a new instance when necessary. This is particularly
useful in scenarios where the Redis instance may change dynamically, such as during
a failover in a Sentinel-managed Redis setup.
Attributes:
_client (redis.Redis): The actual Redis client instance. It remains None until
initialized with the `initialize` method.
Methods:
initialize(client): Initializes the Redis client if it hasn't been initialized already.
__getattr__(item): Delegates attribute access to the Redis client, raising an error
if the client is not initialized.
"""

def __init__(self):
self._client = None

def initialize(self, client):
if self._client is None:
self._client = client

def __getattr__(self, item):
if self._client is None:
raise RuntimeError("Redis client is not initialized. Call init_app first.")
return getattr(self._client, item)


redis_client = RedisClientWrapper()


def init_app(app):
global redis_client
connection_class = Connection
if app.config.get("REDIS_USE_SSL"):
connection_class = SSLConnection

redis_client.connection_pool = redis.ConnectionPool(
**{
"host": app.config.get("REDIS_HOST"),
"port": app.config.get("REDIS_PORT"),
"username": app.config.get("REDIS_USERNAME"),
"password": app.config.get("REDIS_PASSWORD"),
"db": app.config.get("REDIS_DB"),
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
},
connection_class=connection_class,
)
redis_params = {
"username": app.config.get("REDIS_USERNAME"),
"password": app.config.get("REDIS_PASSWORD"),
"db": app.config.get("REDIS_DB"),
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
}

if app.config.get("REDIS_USE_SENTINEL"):
sentinel_hosts = [
(node.split(":")[0], int(node.split(":")[1])) for node in app.config.get("REDIS_SENTINELS").split(",")
]
sentinel = Sentinel(
sentinel_hosts,
sentinel_kwargs={
"socket_timeout": app.config.get("REDIS_SENTINEL_SOCKET_TIMEOUT", 0.1),
"username": app.config.get("REDIS_SENTINEL_USERNAME"),
"password": app.config.get("REDIS_SENTINEL_PASSWORD"),
},
)
master = sentinel.master_for(app.config.get("REDIS_SENTINEL_SERVICE_NAME"), **redis_params)
redis_client.initialize(master)
else:
redis_params.update(
{
"host": app.config.get("REDIS_HOST"),
"port": app.config.get("REDIS_PORT"),
"connection_class": connection_class,
}
)
pool = redis.ConnectionPool(**redis_params)
redis_client.initialize(redis.Redis(connection_pool=pool))

app.extensions["redis"] = redis_client
19 changes: 19 additions & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,35 @@ REDIS_USERNAME=
REDIS_PASSWORD=difyai123456
REDIS_USE_SSL=false

# Whether to use Redis Sentinel mode.
# If set to true, the application will automatically discover and connect to the master node through Sentinel.
REDIS_USE_SENTINEL=false

# List of Redis Sentinel nodes. If Sentinel mode is enabled, provide at least one Sentinel IP and port.
# Format: `<sentinel1_ip>:<sentinel1_port>,<sentinel2_ip>:<sentinel2_port>,<sentinel3_ip>:<sentinel3_port>`
REDIS_SENTINELS=
REDIS_SENTINEL_SERVICE_NAME=
REDIS_SENTINEL_USERNAME=
REDIS_SENTINEL_PASSWORD=
REDIS_SENTINEL_SOCKET_TIMEOUT=0.1

# ------------------------------
# Celery Configuration
# ------------------------------

# Use redis as the broker, and redis db 1 for celery broker.
# Format as follows: `redis://<redis_username>:<redis_password>@<redis_host>:<redis_port>/<redis_database>`
# Example: redis://:difyai123456@redis:6379/1
# If use Redis Sentinel, format as follows: `sentinel://<sentinel_username>:<sentinel_password>@<sentinel_host>:<sentinel_port>/<redis_database>`
# Example: sentinel://localhost:26379/1;sentinel://localhost:26380/1;sentinel://localhost:26381/1
CELERY_BROKER_URL=redis://:difyai123456@redis:6379/1
BROKER_USE_SSL=false

# If you are using Redis Sentinel for high availability, configure the following settings.
CELERY_USE_SENTINEL=false
CELERY_SENTINEL_MASTER_NAME=
CELERY_SENTINEL_SOCKET_TIMEOUT=0.1

# ------------------------------
# CORS Configuration
# Used to set the front-end cross-domain access policy.
Expand Down
9 changes: 9 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,17 @@ x-shared-env: &shared-api-worker-env
REDIS_PASSWORD: ${REDIS_PASSWORD:-difyai123456}
REDIS_USE_SSL: ${REDIS_USE_SSL:-false}
REDIS_DB: 0
REDIS_USE_SENTINEL: ${REDIS_USE_SENTINEL:-false}
REDIS_SENTINELS: ${REDIS_SENTINELS:-}
REDIS_SENTINEL_SERVICE_NAME: ${REDIS_SENTINEL_SERVICE_NAME:-}
REDIS_SENTINEL_USERNAME: ${REDIS_SENTINEL_USERNAME:-}
REDIS_SENTINEL_PASSWORD: ${REDIS_SENTINEL_PASSWORD:-}
REDIS_SENTINEL_SOCKET_TIMEOUT: ${REDIS_SENTINEL_SOCKET_TIMEOUT:-}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1}
BROKER_USE_SSL: ${BROKER_USE_SSL:-false}
CELERY_USE_SENTINEL: ${CELERY_USE_SENTINEL:-false}
CELERY_SENTINEL_MASTER_NAME: ${CELERY_SENTINEL_MASTER_NAME:-}
CELERY_SENTINEL_SOCKET_TIMEOUT: ${CELERY_SENTINEL_SOCKET_TIMEOUT:-}
WEB_API_CORS_ALLOW_ORIGINS: ${WEB_API_CORS_ALLOW_ORIGINS:-*}
CONSOLE_CORS_ALLOW_ORIGINS: ${CONSOLE_CORS_ALLOW_ORIGINS:-*}
STORAGE_TYPE: ${STORAGE_TYPE:-local}
Expand Down

0 comments on commit d542b15

Please sign in to comment.