Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 9 additions & 50 deletions src/dispatch/decorators.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from contextlib import _GeneratorContextManager, contextmanager
from functools import wraps
from typing import Any, Callable, List
import inspect
import logging
import time

from sqlalchemy.engine import Engine
from sqlalchemy.orm import scoped_session, Session

from dispatch.metrics import provider as metrics_provider
from dispatch.organization import service as organization_service
Expand All @@ -23,67 +20,30 @@ def fullname(o):
return f"{module.__name__}.{o.__qualname__}"


@contextmanager
def _session(
engine: Engine,
is_scoped: bool = False,
) -> None:
"""Provide a transactional scope around a series of operations."""
session = (
sessionmaker(bind=engine)()
if not is_scoped
else scoped_session(sessionmaker(bind=engine))()
)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise Exception from None
finally:
session.close()


def _execute_task_in_project_context(
func: Callable,
is_scoped: bool = False,
*args,
**kwargs,
) -> None:
db_session = SessionLocal()
metrics_provider.counter("function.call.counter", tags={"function": fullname(func)})
start = time.perf_counter()

def __execute_task_within_session_context(
schema_session: _GeneratorContextManager[Session],
) -> None:
kwargs["db_session"] = schema_session
for project in project_service.get_all(db_session=schema_session):
project = schema_session.merge(project)
kwargs["project"] = project
try:
func(*args, **kwargs)
except Exception as e:
log.exception(e)

try:
# iterate for all schema
for organization in organization_service.get_all(db_session=db_session):
schema_engine = engine.execution_options(
schema_translate_map={None: f"dispatch_organization_{organization.slug}"}
)
if not is_scoped:
with _session(
engine=schema_engine,
is_scoped=False,
) as __session:
__execute_task_within_session_context(__session)
else:
with _session(
engine=schema_engine,
is_scoped=True,
) as __session:
__execute_task_within_session_context(__session)
schema_session = sessionmaker(bind=schema_engine)()
kwargs["db_session"] = schema_session
for project in project_service.get_all(db_session=schema_session):
kwargs["project"] = project
try:
func(*args, **kwargs)
except Exception as e:
log.exception(e)
schema_session.close()

elapsed_time = time.perf_counter() - start
metrics_provider.timer(
Expand All @@ -109,7 +69,6 @@ def wrapper(*args, **kwargs):
func,
*args,
**kwargs,
is_scoped=False,
)

return wrapper
Expand Down