Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,11 +882,44 @@ def terminate_processes(signum, frame):
@signals_group.command("process")
def process_signals():
"""Runs a continuous process that does additional processing on newly created signals."""
from sqlalchemy import asc

from dispatch.common.utils.cli import install_plugins
from dispatch.signal.service import main_processing_loop
from dispatch.database.core import SessionLocal, engine, sessionmaker
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.signal import flows as signal_flows
from dispatch.signal.models import SignalInstance

install_plugins()
main_processing_loop()

organizations = get_all_organizations(db_session=SessionLocal())
while True:
for organization in organizations:
schema_engine = engine.execution_options(
schema_translate_map={
None: f"dispatch_organization_{organization.slug}",
}
)
db_session = sessionmaker(bind=schema_engine)()
signal_instances = (
(
db_session.query(SignalInstance)
.filter(SignalInstance.filter_action == None) # noqa
.filter(SignalInstance.case_id == None) # noqa
)
.order_by(asc(SignalInstance.created_at))
.limit(500)
)
for signal_instance in signal_instances:
try:
signal_flows.signal_instance_create_flow(
db_session=db_session,
signal_instance_id=signal_instance.id,
)
except Exception as e:
log.debug(signal_instance)
log.exception(e)
db_session.close()


@dispatch_server.command("slack")
Expand Down