Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
contrib/*.bin
data/archive
data/model_type_prediction.ftz
debug.sqlite3
Expand Down
24 changes: 24 additions & 0 deletions contrib/memory_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Entrypoint for memray. make sure postgres & redis are running

export REDIS_URL=redis://localhost
export OPENALEPH_DB_URI=postgresql:///openaleph
export FTM_FRAGMENTS_URI=postgresql:///openaleph

then:
opal-procrastinate init-db
memray run contrib/memory_test.py
"""

from pathlib import Path

from openaleph_procrastinate.settings import DeferSettings

from ingestors.tasks import app, ingest_path

path = Path(__file__).parent.parent.resolve() / "tests/fixtures"
defer_settings = DeferSettings()

ingest_path("memory_test", path=path, languages=[])

app.run_worker(queues=[defer_settings.ingest.queue], wait=False)
13 changes: 9 additions & 4 deletions ingestors/media/image.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from io import BytesIO
from PIL import Image, ExifTags

from followthemoney import model
from PIL import ExifTags, Image

from ingestors.exc import ProcessingException
from ingestors.ingestor import Ingestor
from ingestors.support.ocr import OCRSupport
from ingestors.support.timestamp import TimestampSupport
from ingestors.exc import ProcessingException

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,15 +61,19 @@ def ingest(self, file_path, entity):
with open(file_path, "rb") as fh:
data = fh.read()

image = None
try:
image = Image.open(BytesIO(data))
image.load()
self.extract_exif(image, entity)
languages = self.manager.context.get("languages")
text = self.extract_ocr_text(data, languages=languages)
entity.add("bodyText", text)
except (OSError, IOError, Exception) as err:
raise ProcessingException("Failed to open image: %s" % err)
except Exception as err:
raise ProcessingException("Failed to process image: %s" % err)
finally:
if image is not None:
image.close()

@classmethod
def match(cls, file_path, entity):
Expand Down
5 changes: 3 additions & 2 deletions ingestors/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from openaleph_procrastinate.settings import OpenAlephSettings
from pydantic_settings import SettingsConfigDict
from servicelayer import env
from servicelayer import settings as sls


Expand All @@ -25,5 +24,7 @@ class Settings(OpenAlephSettings):
"""Headless libreoffice document convert timeout in seconds"""


_settings = OpenAlephSettings()

# Also store cached values in the SQL database
sls.TAGS_DATABASE_URI = env.get("FTM_STORE_URI", env.get("ALEPH_DATABASE_URI"))
sls.TAGS_DATABASE_URI = _settings.fragments_uri
70 changes: 49 additions & 21 deletions ingestors/support/ocr.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import threading
import time
from contextlib import contextmanager
from functools import cache
from hashlib import sha1
from io import BytesIO
Expand Down Expand Up @@ -88,35 +89,62 @@ def configure_engine(self, languages):

def extract_text(self, data, languages=None):
"""Extract text from a binary string of data."""
image = None
try:
image = Image.open(BytesIO(data))
image.load()
except Exception as exc:
log.error("Cannot open image data using Pillow: %s", exc)
return ""

with temp_locale(TESSERACT_LOCALE):
languages = self.language_list(languages)
try:
with temp_locale(TESSERACT_LOCALE):
languages = self.language_list(languages)
with self.engine(languages) as api:
# TODO: play with contrast and sharpening the images.
start_time = time.time()
api.SetImage(image)
text = api.GetUTF8Text()
confidence = api.MeanTextConf()
end_time = time.time()
duration = end_time - start_time
log.info(
"w: %s, h: %s, l: %s, c: %s, took: %.5f",
image.width,
image.height,
languages,
confidence,
duration,
)
return text
except Exception as exc:
log.error("OCR error: %s", exc)
return ""
finally:
if image is not None:
image.close()

@contextmanager
def engine(self, languages):
"""Context manager for OCR engine that ensures cleanup."""
api = None
try:
api = self.configure_engine(languages)
yield api
finally:
if api is not None:
try:
api.Clear()
except Exception as exc:
log.warning("Error clearing OCR engine: %s", exc)

def __del__(self):
"""Clean up thread-local OCR resources when the service is destroyed."""
if hasattr(self.tl, "api") and self.tl.api is not None:
log.info("Cleaning up OCR engine for current thread")
try:
# TODO: play with contrast and sharpening the images.
start_time = time.time()
api.SetImage(image)
text = api.GetUTF8Text()
confidence = api.MeanTextConf()
end_time = time.time()
duration = end_time - start_time
log.info(
"w: %s, h: %s, l: %s, c: %s, took: %.5f",
image.width,
image.height,
languages,
confidence,
duration,
)
return text
self.tl.api.End()
except Exception as exc:
log.error("OCR error: %s", exc)
return ""
log.warning("Error cleaning up OCR engine: %s", exc)
finally:
api.Clear()
self.tl.api = None
3 changes: 1 addition & 2 deletions ingestors/support/shell.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import shutil
import subprocess

Expand Down Expand Up @@ -28,7 +27,7 @@ def exec_command(self, command, *args):
cmd.extend([path_string(a) for a in args])
try:
code = subprocess.call(
cmd, timeout=self.COMMAND_TIMEOUT, stdout=open(os.devnull, "wb")
cmd, timeout=self.COMMAND_TIMEOUT, stdout=subprocess.DEVNULL
)
except OSError as ose:
raise ProcessingException("Error: %s" % ose) from ose
Expand Down
25 changes: 14 additions & 11 deletions ingestors/tabular/access.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import os
import io
import csv
import io
import logging
import os
import subprocess
from collections import OrderedDict

from followthemoney import model

from ingestors.exc import ProcessingException
from ingestors.ingestor import Ingestor
from ingestors.support.shell import ShellSupport
from ingestors.support.table import TableSupport
from ingestors.exc import ProcessingException

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,14 +45,16 @@ def generate_rows(self, file_path, table_name):
if mdb_export is None:
raise RuntimeError("mdb-tools is not available")
args = [mdb_export, "-b", "strip", file_path, table_name]
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
output = io.TextIOWrapper(proc.stdout, newline=os.linesep)
headers = None
for row in csv.reader((line for line in output), delimiter=","):
if headers is None:
headers = row
continue
yield OrderedDict(zip(headers, row))
with subprocess.Popen(args, stdout=subprocess.PIPE) as proc:
if proc.stdout is None:
raise ProcessingException("Failed to create subprocess stdout pipe")
output = io.TextIOWrapper(proc.stdout, newline=os.linesep)
headers = None
for row in csv.reader((line for line in output), delimiter=","):
if headers is None:
headers = row
continue
yield OrderedDict(zip(headers, row))

def ingest(self, file_path, entity):
entity.schema = model.get("Workbook")
Expand Down
27 changes: 13 additions & 14 deletions ingestors/tabular/csv.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import io
import csv
import io
import logging

from followthemoney import model

from ingestors.exc import ProcessingException
from ingestors.ingestor import Ingestor
from ingestors.support.table import TableSupport
from ingestors.exc import ProcessingException

log = logging.getLogger(__name__)

Expand All @@ -26,15 +27,13 @@ def ingest(self, file_path, entity):
encoding = self.detect_stream_encoding(fh)
log.debug("Detected encoding [%r]: %s", entity, encoding)

fh = io.open(file_path, "r", encoding=encoding, errors="replace")
try:
sample = fh.read(4096 * 10)
fh.seek(0)
dialect = csv.Sniffer().sniff(sample)
reader = csv.reader(fh, dialect=dialect)
self.emit_row_tuples(entity, reader)
except (Exception, UnicodeDecodeError, csv.Error) as err:
log.warning("CSV error: %s", err)
raise ProcessingException("Invalid CSV: %s" % err) from err
finally:
fh.close()
with io.open(file_path, "r", encoding=encoding, errors="replace") as fh:
try:
sample = fh.read(4096 * 10)
fh.seek(0)
dialect = csv.Sniffer().sniff(sample)
reader = csv.reader(fh, dialect=dialect)
self.emit_row_tuples(entity, reader)
except (Exception, csv.Error) as err:
log.warning("CSV error: %s", err)
raise ProcessingException("Invalid CSV: %s" % err) from err
4 changes: 4 additions & 0 deletions ingestors/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gc
import logging
from pathlib import Path

Expand Down Expand Up @@ -50,6 +51,9 @@ def ingest(job: DatasetJob) -> None:
if to_index:
defer.index(app, job.dataset, to_index, **job.context)

# FIXME
gc.collect()


def ingest_path(
dataset: str, path: Path, languages: list[str], foreign_id: str | None = None
Expand Down
Loading