Skip to content

Commit ba9ca09

Browse files
committed
🐛 (manager) Properly iterate through emitted memory store
1 parent 6f4181b commit ba9ca09

File tree

3 files changed

+8
-5
lines changed

3 files changed

+8
-5
lines changed

ingestors/manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import magic
99
from banal import ensure_list
10-
from followthemoney import StatementEntity, model
10+
from followthemoney import DefaultDataset, StatementEntity, model
1111
from followthemoney.helpers import entity_filename
1212
from followthemoney.namespace import Namespace
1313
from ftmq.store.fragments import get_fragments
@@ -107,7 +107,7 @@ def __init__(self, app: App, dataset: str, context: dict[str, Any]):
107107
self.context = context
108108
self.ns = Namespace(self.context["namespace"])
109109
self.work_path = ensure_path(mkdtemp(prefix="ingestor-"))
110-
self.emitted = MemoryStore()
110+
self.emitted = MemoryStore(dataset=DefaultDataset)
111111
self.archive = get_archive()
112112

113113
def make_entity(self, schema, parent=None):
@@ -261,3 +261,6 @@ def delegate(self, ingestor_class, file_path, entity):
261261
def close(self):
262262
self.writer.flush()
263263
remove_directory(self.work_path)
264+
265+
def get_emitted(self) -> list[StatementEntity]:
266+
return list(self.emitted.iterate(dataset=DefaultDataset))

ingestors/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def ingest(job: DatasetJob) -> None:
3939
finally:
4040
manager.close()
4141

42-
emitted = list(manager.emitted.iterate())
42+
emitted = manager.get_emitted()
4343
for entity in emitted:
4444
if entity.schema.is_a("Analyzable"):
4545
to_analyze.append(entity)
@@ -77,6 +77,6 @@ def ingest_path(
7777
manager.queue_entity(entity)
7878
if path.is_dir():
7979
DirectoryIngestor.crawl(manager, path)
80-
emitted = list(manager.emitted.iterate())
80+
emitted = manager.get_emitted()
8181
log.info(f"Emitted {len(emitted)} entities.", emitted=[e.id for e in emitted])
8282
manager.close()

tests/support.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def fixture(self, fixture_path):
6868
return path, entity
6969

7070
def get_emitted(self, schema=None):
71-
entities = list(self.manager.db.iterate())
71+
entities = self.manager.get_emitted()
7272
if schema is not None:
7373
entities = [e for e in entities if e.schema.is_a(schema)]
7474
return entities

0 commit comments

Comments
 (0)