Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support redis sentinel mode #7756

Merged
merged 4 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/configs/middleware/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional
from urllib.parse import quote_plus

from pydantic import Field, NonNegativeInt, PositiveInt, computed_field
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt, computed_field
from pydantic_settings import BaseSettings

from configs.middleware.cache.redis_config import RedisConfig
Expand Down Expand Up @@ -157,6 +157,21 @@ class CeleryConfig(DatabaseConfig):
default=None,
)

CELERY_USE_SENTINEL: Optional[bool] = Field(
description="Whether to use Redis Sentinel mode",
default=False,
)

CELERY_SENTINEL_MASTER_NAME: Optional[str] = Field(
description="Redis Sentinel master name",
default=None,
)

CELERY_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field(
description="Redis Sentinel socket timeout",
default=0.1,
)

@computed_field
@property
def CELERY_RESULT_BACKEND(self) -> str | None:
Expand Down
32 changes: 31 additions & 1 deletion api/configs/middleware/cache/redis_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from pydantic import Field, NonNegativeInt, PositiveInt
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt
from pydantic_settings import BaseSettings


Expand Down Expand Up @@ -38,3 +38,33 @@ class RedisConfig(BaseSettings):
description="whether to use SSL for Redis connection",
default=False,
)

REDIS_USE_SENTINEL: Optional[bool] = Field(
description="Whether to use Redis Sentinel mode",
default=False,
)

REDIS_SENTINELS: Optional[str] = Field(
description="Redis Sentinel nodes",
default=None,
)

REDIS_SENTINEL_SERVICE_NAME: Optional[str] = Field(
description="Redis Sentinel service name",
default=None,
)

REDIS_SENTINEL_USERNAME: Optional[str] = Field(
description="Redis Sentinel username",
default=None,
)

REDIS_SENTINEL_PASSWORD: Optional[str] = Field(
description="Redis Sentinel password",
default=None,
)

REDIS_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field(
description="Redis Sentinel socket timeout",
default=0.1,
)
21 changes: 16 additions & 5 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

broker_transport_options = {}

if app.config.get("CELERY_USE_SENTINEL"):
broker_transport_options = {
"master_name": app.config.get("CELERY_SENTINEL_MASTER_NAME"),
"sentinel_kwargs": {
"socket_timeout": app.config.get("CELERY_SENTINEL_SOCKET_TIMEOUT", 0.1),
},
}

celery_app = Celery(
app.name,
task_cls=FlaskTask,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_BACKEND"],
broker=app.config.get("CELERY_BROKER_URL"),
backend=app.config.get("CELERY_BACKEND"),
task_ignore_result=True,
)

Expand All @@ -27,11 +37,12 @@ def __call__(self, *args: object, **kwargs: object) -> object:
}

celery_app.conf.update(
result_backend=app.config["CELERY_RESULT_BACKEND"],
result_backend=app.config.get("CELERY_RESULT_BACKEND"),
broker_transport_options=broker_transport_options,
broker_connection_retry_on_startup=True,
)

if app.config["BROKER_USE_SSL"]:
if app.config.get("BROKER_USE_SSL"):
celery_app.conf.update(
broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
)
Expand All @@ -43,7 +54,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"schedule.clean_embedding_cache_task",
"schedule.clean_unused_datasets_task",
]
day = app.config["CELERY_BEAT_SCHEDULER_TIME"]
day = app.config.get("CELERY_BEAT_SCHEDULER_TIME")
beat_schedule = {
"clean_embedding_cache_task": {
"task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
Expand Down
85 changes: 71 additions & 14 deletions api/extensions/ext_redis.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,83 @@
import redis
from redis.connection import Connection, SSLConnection
from redis.sentinel import Sentinel

redis_client = redis.Redis()

class RedisClientWrapper(redis.Redis):
"""
A wrapper class for the Redis client that addresses the issue where the global
`redis_client` variable cannot be updated when a new Redis instance is returned
by Sentinel.

This class allows for deferred initialization of the Redis client, enabling the
client to be re-initialized with a new instance when necessary. This is particularly
useful in scenarios where the Redis instance may change dynamically, such as during
a failover in a Sentinel-managed Redis setup.

Attributes:
_client (redis.Redis): The actual Redis client instance. It remains None until
initialized with the `initialize` method.

Methods:
initialize(client): Initializes the Redis client if it hasn't been initialized already.
__getattr__(item): Delegates attribute access to the Redis client, raising an error
if the client is not initialized.
"""

def __init__(self):
self._client = None

def initialize(self, client):
if self._client is None:
self._client = client

def __getattr__(self, item):
if self._client is None:
raise RuntimeError("Redis client is not initialized. Call init_app first.")
return getattr(self._client, item)


redis_client = RedisClientWrapper()


def init_app(app):
global redis_client
connection_class = Connection
if app.config.get("REDIS_USE_SSL"):
connection_class = SSLConnection

redis_client.connection_pool = redis.ConnectionPool(
**{
"host": app.config.get("REDIS_HOST"),
"port": app.config.get("REDIS_PORT"),
"username": app.config.get("REDIS_USERNAME"),
"password": app.config.get("REDIS_PASSWORD"),
"db": app.config.get("REDIS_DB"),
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
},
connection_class=connection_class,
)
redis_params = {
"username": app.config.get("REDIS_USERNAME"),
"password": app.config.get("REDIS_PASSWORD"),
"db": app.config.get("REDIS_DB"),
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
}

if app.config.get("REDIS_USE_SENTINEL"):
sentinel_hosts = [
(node.split(":")[0], int(node.split(":")[1])) for node in app.config.get("REDIS_SENTINELS").split(",")
]
sentinel = Sentinel(
sentinel_hosts,
sentinel_kwargs={
"socket_timeout": app.config.get("REDIS_SENTINEL_SOCKET_TIMEOUT", 0.1),
"username": app.config.get("REDIS_SENTINEL_USERNAME"),
"password": app.config.get("REDIS_SENTINEL_PASSWORD"),
},
)
master = sentinel.master_for(app.config.get("REDIS_SENTINEL_SERVICE_NAME"), **redis_params)
redis_client.initialize(master)
else:
redis_params.update(
{
"host": app.config.get("REDIS_HOST"),
"port": app.config.get("REDIS_PORT"),
"connection_class": connection_class,
}
)
pool = redis.ConnectionPool(**redis_params)
redis_client.initialize(redis.Redis(connection_pool=pool))

app.extensions["redis"] = redis_client
19 changes: 19 additions & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,35 @@ REDIS_USERNAME=
REDIS_PASSWORD=difyai123456
REDIS_USE_SSL=false

# Whether to use Redis Sentinel mode.
# If set to true, the application will automatically discover and connect to the master node through Sentinel.
REDIS_USE_SENTINEL=false

# List of Redis Sentinel nodes. If Sentinel mode is enabled, provide at least one Sentinel IP and port.
# Format: `<sentinel1_ip>:<sentinel1_port>,<sentinel2_ip>:<sentinel2_port>,<sentinel3_ip>:<sentinel3_port>`
REDIS_SENTINELS=
REDIS_SENTINEL_SERVICE_NAME=
REDIS_SENTINEL_USERNAME=
REDIS_SENTINEL_PASSWORD=
REDIS_SENTINEL_SOCKET_TIMEOUT=0.1

# ------------------------------
# Celery Configuration
# ------------------------------

# Use redis as the broker, and redis db 1 for celery broker.
# Format as follows: `redis://<redis_username>:<redis_password>@<redis_host>:<redis_port>/<redis_database>`
# Example: redis://:difyai123456@redis:6379/1
# If use Redis Sentinel, format as follows: `sentinel://<sentinel_username>:<sentinel_password>@<sentinel_host>:<sentinel_port>/<redis_database>`
# Example: sentinel://localhost:26379/1;sentinel://localhost:26380/1;sentinel://localhost:26381/1
CELERY_BROKER_URL=redis://:difyai123456@redis:6379/1
BROKER_USE_SSL=false

# If you are using Redis Sentinel for high availability, configure the following settings.
CELERY_USE_SENTINEL=false
CELERY_SENTINEL_MASTER_NAME=
CELERY_SENTINEL_SOCKET_TIMEOUT=0.1

# ------------------------------
# CORS Configuration
# Used to set the front-end cross-domain access policy.
Expand Down
9 changes: 9 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,17 @@ x-shared-env: &shared-api-worker-env
REDIS_PASSWORD: ${REDIS_PASSWORD:-difyai123456}
REDIS_USE_SSL: ${REDIS_USE_SSL:-false}
REDIS_DB: 0
REDIS_USE_SENTINEL: ${REDIS_USE_SENTINEL:-false}
REDIS_SENTINELS: ${REDIS_SENTINELS:-}
REDIS_SENTINEL_SERVICE_NAME: ${REDIS_SENTINEL_SERVICE_NAME:-}
REDIS_SENTINEL_USERNAME: ${REDIS_SENTINEL_USERNAME:-}
REDIS_SENTINEL_PASSWORD: ${REDIS_SENTINEL_PASSWORD:-}
REDIS_SENTINEL_SOCKET_TIMEOUT: ${REDIS_SENTINEL_SOCKET_TIMEOUT:-}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1}
BROKER_USE_SSL: ${BROKER_USE_SSL:-false}
CELERY_USE_SENTINEL: ${CELERY_USE_SENTINEL:-false}
CELERY_SENTINEL_MASTER_NAME: ${CELERY_SENTINEL_MASTER_NAME:-}
CELERY_SENTINEL_SOCKET_TIMEOUT: ${CELERY_SENTINEL_SOCKET_TIMEOUT:-}
WEB_API_CORS_ALLOW_ORIGINS: ${WEB_API_CORS_ALLOW_ORIGINS:-*}
CONSOLE_CORS_ALLOW_ORIGINS: ${CONSOLE_CORS_ALLOW_ORIGINS:-*}
STORAGE_TYPE: ${STORAGE_TYPE:-local}
Expand Down