Skip to content

Commit 354a735

Browse files
committed
⚗️ (procrastinate) Try to be async to avoid deadlocks
1 parent c93de10 commit 354a735

File tree

4 files changed

+167
-152
lines changed

4 files changed

+167
-152
lines changed

ingestors/tasks.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import gc
23
import logging
34
from pathlib import Path
@@ -7,7 +8,7 @@
78
from openaleph_procrastinate import defer
89
from openaleph_procrastinate.app import make_app
910
from openaleph_procrastinate.model import DatasetJob
10-
from openaleph_procrastinate.tasks import task
11+
from openaleph_procrastinate.tasks import async_task
1112
from prometheus_client import Info
1213
from servicelayer.archive.util import ensure_path
1314

@@ -24,28 +25,38 @@
2425
log = logging.getLogger(__name__)
2526

2627

27-
@task(app=app)
28-
def ingest(job: DatasetJob) -> None:
29-
to_analyze: list[EntityProxy] = []
30-
to_index: list[EntityProxy] = []
31-
manager = Manager(sync_app, job.dataset, job.context)
28+
@async_task(app=app)
29+
async def ingest(job: DatasetJob) -> None:
30+
def _run_ingest():
31+
to_analyze: list[EntityProxy] = []
32+
to_index: list[EntityProxy] = []
33+
manager = Manager(sync_app, job.dataset, job.context)
3234

33-
try:
34-
for entity in job.get_entities():
35-
job.log.debug(
36-
f"Ingesting `{entity.first("contentHash")}`", entity=entity.to_dict()
37-
)
38-
manager.ingest_entity(entity)
39-
finally:
40-
manager.close()
35+
try:
36+
for entity in job.get_entities():
37+
job.log.debug(
38+
f"Ingesting `{entity.first("contentHash")}`",
39+
entity=entity.to_dict(),
40+
)
41+
manager.ingest_entity(entity)
42+
finally:
43+
manager.close()
4144

42-
for entity in manager.iterate_emitted():
43-
if entity.schema.is_a("Analyzable"):
44-
to_analyze.append(entity)
45+
for entity in manager.iterate_emitted():
46+
if entity.schema.is_a("Analyzable"):
47+
to_analyze.append(entity)
4548

46-
to_index.append(entity)
49+
to_index.append(entity)
50+
51+
job.log.info(
52+
f"Emitted {len(manager.emitted)} entities.", emitted=manager.emitted
53+
)
54+
55+
return to_analyze, to_index
56+
57+
# Run blocking Manager operations in thread pool
58+
to_analyze, to_index = await asyncio.to_thread(_run_ingest)
4759

48-
job.log.info(f"Emitted {len(manager.emitted)} entities.", emitted=manager.emitted)
4960
if to_analyze:
5061
defer.analyze(app, job.dataset, to_analyze, batch=job.batch, **job.context)
5162
if to_index:

0 commit comments

Comments
 (0)