Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.

Conversation

@wssheldon
Copy link
Contributor

No description provided.

@wssheldon wssheldon added the enhancement New feature or request label Aug 22, 2024
@wssheldon wssheldon self-assigned this Aug 22, 2024
@wssheldon wssheldon requested a review from kevgliss August 22, 2024 15:50
@kevgliss
Copy link
Contributor

I'm not convinced we are handling issues at the right level. Instead, I think we should let the caller more gracefully handle the thread's failures/exceptions. e.g. in the consume cli function:

import time
from threading import Thread, Event
import logging
import signal

from dispatch.common.utils.cli import install_plugins
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.database.core import get_session, get_organization_session

install_plugins()
with get_session() as session:
    organizations = get_all_organizations(db_session=session)

log = logging.getLogger(__name__)

# Replace manager dictionary with an Event
running = Event()
running.set()

workers = []

def _run_consume_with_exception_handling(plugin_slug, organization_slug, project_id, running):
    while running.is_set():
        try:
            _run_consume(plugin_slug, organization_slug, project_id, running)
        except Exception as e:
            log.error(f"Exception in thread for plugin {plugin_slug}: {e}", exc_info=True)
            time.sleep(1)  # Optional: Add a small delay before retrying

def start_worker(plugin_slug, organization_slug, project_id, running):
    t = Thread(
        target=_run_consume_with_exception_handling,
        args=(plugin_slug, organization_slug, project_id, running),
        daemon=True,  # Set thread to daemon
    )
    t.start()
    return t

for organization in organizations:
    with get_organization_session(organization.slug) as session:
        projects = project_service.get_all(db_session=session)
        for project in projects:
            plugins = plugin_service.get_active_instances(
                db_session=session, plugin_type="signal-consumer", project_id=project.id
            )

            if not plugins:
                log.warning(
                    f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
                )

            for plugin in plugins:
                log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}")
                for _ in range(5):  # TODO add plugin.instance.concurrency
                    worker = start_worker(plugin.plugin.slug, organization.slug, project.id, running)
                    workers.append(worker)

def terminate_processes(signum, frame):
    print("Terminating main process...")
    running.clear()  # stop all threads
    for worker in workers:
        worker.join()

signal.signal(signal.SIGINT, terminate_processes)
signal.signal(signal.SIGTERM, terminate_processes)

# Keep the main thread running
while True:
    if not running.is_set():
        print("Main process terminating.")
        break
    time.sleep(1)

@wssheldon
Copy link
Contributor Author

@kevgliss Agreed will add these improvements as well.

@wssheldon
Copy link
Contributor Author

@kevgliss #5118

@wssheldon wssheldon merged commit 9eba4df into master Aug 22, 2024
@wssheldon wssheldon deleted the refactor/aws-plug branch August 22, 2024 18:35
wssheldon added a commit that referenced this pull request Sep 6, 2024
…rns (#5117)

* refactor(plugins/aws): error handling, logging, types, and early returns

* logs: add specific log message to create instance exception
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants