Skip to content

[feat] add elasticsearch adapter #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
471 changes: 471 additions & 0 deletions archipy/adapters/elasticsearch/adapters.py

Large diffs are not rendered by default.

449 changes: 449 additions & 0 deletions archipy/adapters/elasticsearch/ports.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions archipy/configs/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DatetimeConfig,
ElasticSearchAPMConfig,
ElasticSearchConfig,
ElasticsearchConfig,
EmailConfig,
FastAPIConfig,
FileConfig,
Expand Down Expand Up @@ -156,6 +157,7 @@ def settings_customise_sources(
STARROCKS_SQLALCHEMY: StarrocksSQLAlchemyConfig = StarrocksSQLAlchemyConfig()
POSTGRES_SQLALCHEMY: PostgresSQLAlchemyConfig = PostgresSQLAlchemyConfig()
SQLITE_SQLALCHEMY: SqliteSQLAlchemyConfig = SqliteSQLAlchemyConfig()
ELASTICSEARCH: ElasticsearchConfig = ElasticsearchConfig()

def customize(self) -> None:
"""Customize configuration after loading.
Expand Down
40 changes: 40 additions & 0 deletions archipy/configs/config_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,3 +770,43 @@ class DatetimeConfig(BaseModel):
REQUEST_TIMEOUT: int = 5
MAX_RETRIES: int = 3
CACHE_TTL: int = 86400 # TTL for cache in seconds (24 hours)


class ElasticsearchConfig(BaseModel):
"""Configuration settings for Elasticsearch integration.

Controls Elasticsearch server connection parameters, authentication, and client behavior settings.
This configuration supports both basic authentication and API key authentication methods.

Attributes:
HOSTS (list[str]): List of Elasticsearch host URLs (e.g., ['http://localhost:9200']).
API_KEY (str | None): API key for authentication. If provided, takes precedence over username/password.
USERNAME (str | None): Username for basic authentication.
PASSWORD (str | None): Password for basic authentication.
CLIENT_CERT (str | None): Path to client certificate file for SSL/TLS authentication.
CLIENT_KEY (str | None): Path to client private key file for SSL/TLS authentication.
CA_CERTS (str | None): Path to CA certificates file for SSL/TLS verification.
REQUEST_TIMEOUT (float | None): Timeout in seconds for requests. Defaults to 10.0.
MAX_RETRIES (int): Maximum number of retries for failed requests. Defaults to 0 (no retries).
RETRY_ON_STATUS (tuple[int, ...]): HTTP status codes that should trigger a retry.
Defaults to (429, 502, 503, 504) for rate limiting and server errors.
RETRY_ON_TIMEOUT (bool): Whether to retry on timeout. Defaults to False.
HTTP_COMPRESS (bool): Whether to enable HTTP compression. Defaults to False.
CONNECTIONS_PER_NODE (int): Maximum number of connections per node. Defaults to 10.
VERIFY_CERTS (bool): Whether to verify SSL certificates. Defaults to True.
"""

HOSTS: list[str] = []
API_KEY: str | None = None
USERNAME: str | None = None
PASSWORD: str | None = None
CLIENT_CERT: str | None = None
CLIENT_KEY: str | None = None
CA_CERTS: str | None = None
REQUEST_TIMEOUT: float | None = 10.0
MAX_RETRIES: int = 0
RETRY_ON_STATUS: tuple[int, ...] = (429, 502, 503, 504)
RETRY_ON_TIMEOUT: bool = False
HTTP_COMPRESS: bool = False
CONNECTIONS_PER_NODE: int = 10
VERIFY_CERTS: bool = True
10 changes: 9 additions & 1 deletion features/steps/atomic_transaction_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

from archipy.adapters.base.sqlalchemy.session_manager_registry import SessionManagerRegistry
from archipy.configs.config_template import SqliteSQLAlchemyConfig
from archipy.helpers.decorators.sqlalchemy_atomic import async_sqlite_sqlalchemy_atomic_decorator, sqlite_sqlalchemy_atomic_decorator
from archipy.helpers.decorators.sqlalchemy_atomic import (
async_sqlite_sqlalchemy_atomic_decorator,
sqlite_sqlalchemy_atomic_decorator,
)
from archipy.models.entities.sqlalchemy.base_entities import BaseEntity
from archipy.models.errors import InternalError

Expand Down Expand Up @@ -220,6 +223,7 @@ def step_when_entity_creation_fails_in_atomic(context):
logger.info(f"Attempting to create entity with UUID {test_uuid} (will fail)")

try:

@sqlite_sqlalchemy_atomic_decorator
def create_entity_with_failure():
"""Create an entity but raise an exception to trigger rollback."""
Expand Down Expand Up @@ -346,6 +350,7 @@ def inner_atomic():

# Try a failing inner transaction
try:

@sqlite_sqlalchemy_atomic_decorator
def failing_inner_atomic():
"""Inner failing atomic transaction."""
Expand Down Expand Up @@ -743,6 +748,7 @@ def step_when_error_triggered_in_atomic(context):
# Test normal exception handling
logger.info("Testing normal exception handling")
try:

@sqlite_sqlalchemy_atomic_decorator
def normal_exception():
entity = TestEntityFactory.create_test_entity()
Expand All @@ -758,6 +764,7 @@ def normal_exception():
# Test deadlock handling (simulated for SQLite)
logger.info("Testing deadlock exception handling")
try:

@sqlite_sqlalchemy_atomic_decorator
def deadlock_exception():
entity = TestEntityFactory.create_test_entity()
Expand Down Expand Up @@ -1006,6 +1013,7 @@ async def step_when_async_entity_creation_fails(context):
logger.info(f"Creating async entity with UUID {test_uuid} (will fail)")

try:

@async_sqlite_sqlalchemy_atomic_decorator
async def create_entity_with_failure():
"""Create an entity but raise an exception to trigger rollback."""
Expand Down
5 changes: 4 additions & 1 deletion features/steps/kafka_adapter_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def get_kafka_admin_adapter(context):
def get_kafka_producer_adapter(context, topic_name):
"""Get or initialize the Kafka producer adapter."""
scenario_context = get_current_scenario_context(context)
if not hasattr(scenario_context, f"producer_{topic_name}") or getattr(scenario_context, f"producer_{topic_name}") is None:
if (
not hasattr(scenario_context, f"producer_{topic_name}")
or getattr(scenario_context, f"producer_{topic_name}") is None
):
test_config = scenario_context.get("test_config")
context.logger.info(f"Initializing Kafka producer for topic: {topic_name}")
producer = KafkaProducerAdapter(topic_name, kafka_configs=test_config.KAFKA)
Expand Down
Loading
Loading