Skip to content

Commit

Permalink
Use queue_task from servicelayer
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed Jun 25, 2024
1 parent ad80f04 commit 724390d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 46 deletions.
5 changes: 3 additions & 2 deletions ingestors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from servicelayer.extensions import get_extensions
from sentry_sdk import capture_exception
from servicelayer.cache import get_redis
from servicelayer.taskqueue import queue_task, get_rabbitmq_connection
from followthemoney.helpers import entity_filename
from followthemoney.namespace import Namespace
from prometheus_client import Counter, Histogram
Expand Down Expand Up @@ -152,10 +153,10 @@ def auction(self, file_path, entity):
return best_cls

def queue_entity(self, entity):
from ingestors.worker import queue_task

log.debug("Queue: %r", entity)
queue_task(
get_rabbitmq_connection(),
get_redis(),
self.collection_id,
settings.STAGE_INGEST,
self.root_task.job_id,
Expand Down
46 changes: 3 additions & 43 deletions ingestors/worker.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import logging
import json
from pprint import pformat # noqa
import uuid
from random import randrange

import pika
from banal import ensure_list
from followthemoney import model
from ftmstore import get_dataset
from prometheus_client import Info
from servicelayer.cache import get_redis
from servicelayer import settings as sls
from servicelayer.taskqueue import (
Worker,
Task,
Dataset,
dataset_from_collection_id,
get_rabbitmq_connection,
queue_task,
)

from ingestors import __version__
Expand All @@ -27,42 +21,6 @@
log = logging.getLogger(__name__)


# ToDo: Move to servicelayer??
def queue_task(collection_id, stage, job_id=None, context=None, **payload):
task_id = uuid.uuid4().hex
priority = randrange(1, sls.RABBITMQ_MAX_PRIORITY + 1)
body = {
"collection_id": dataset_from_collection_id(collection_id),
"job_id": job_id,
"task_id": task_id,
"operation": stage,
"context": context,
"payload": payload,
"priority": priority,
}

try:
connection = get_rabbitmq_connection()
channel = connection.channel()
channel.confirm_delivery()
channel.basic_publish(
exchange="",
routing_key=body["operation"],
body=json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE, priority=priority
),
mandatory=True,
)
dataset = Dataset(
conn=get_redis(), name=dataset_from_collection_id(collection_id)
)
dataset.add_task(task_id, stage)
channel.close()
except Exception:
log.exception(f"Error while queuing task: {task_id}")


SYSTEM = Info("ingestfile_system", "ingest-file system information")
SYSTEM.info({"ingestfile_version": __version__})

Expand Down Expand Up @@ -126,6 +84,8 @@ def dispatch_pipeline(self, task: Task, payload):
context["pipeline"] = pipeline

queue_task(
get_rabbitmq_connection(),
get_redis(),
task.collection_id,
next_stage,
task.job_id,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ normality==2.5.0
pantomime==0.6.1
followthemoney==3.5.9
followthemoney-store[postgresql]==3.1.0
servicelayer[google,amazon]==1.23.0rc25
servicelayer[google,amazon]==1.23.0rc26
languagecodes==1.1.1
countrytagger==0.1.2
pyicu==2.12
Expand Down

0 comments on commit 724390d

Please sign in to comment.