From b826b683fb2f1044c799a724961bfc6bcb90f1ea Mon Sep 17 00:00:00 2001 From: Friedrich Lindenberg Date: Sun, 3 Nov 2019 22:01:04 +0100 Subject: [PATCH] Fix very large table ingestion, fixes #764 --- aleph/index/entities.py | 1 - aleph/worker.py | 2 +- requirements-toolkit.txt | 2 +- services/ingest-file/ingestors/support/table.py | 4 +++- services/ingest-file/requirements.txt | 2 +- ui/package.json | 2 +- ui/src/viewers/CsvStreamViewer.jsx | 9 +++++++-- 7 files changed, 14 insertions(+), 8 deletions(-) diff --git a/aleph/index/entities.py b/aleph/index/entities.py index c93ecb0739..399467cb2e 100644 --- a/aleph/index/entities.py +++ b/aleph/index/entities.py @@ -96,7 +96,6 @@ def entities_by_ids(ids, schemata=None, cached=False, return index = entities_read_index(schema=schemata) query = {'ids': {'values': ids}} - # query = {'bool': {'filter': query}} query = { 'query': query, '_source': _source_spec(includes, excludes), diff --git a/aleph/worker.py b/aleph/worker.py index 420cafbb0a..99c6b9b995 100644 --- a/aleph/worker.py +++ b/aleph/worker.py @@ -52,7 +52,7 @@ def handle(self, task): if stage.stage == OP_BULKLOAD: bulk_load(stage, collection, payload) if stage.stage == OP_PROCESS: - if payload.get('reset'): + if payload.pop('reset', False): reset_collection(collection, sync=True) process_collection(stage, collection, sync=sync, **payload) if stage.stage == OP_XREF: diff --git a/requirements-toolkit.txt b/requirements-toolkit.txt index c27762324d..27d33ccf7a 100644 --- a/requirements-toolkit.txt +++ b/requirements-toolkit.txt @@ -2,7 +2,7 @@ # Dependencies maintained by OCCRP banal==0.4.2 urlnormalizer==1.2.5 -followthemoney==1.21.5 +followthemoney==1.22.0 fingerprints==0.6.6 servicelayer[google,amazon]==1.9.0 normality==2.0.0 diff --git a/services/ingest-file/ingestors/support/table.py b/services/ingest-file/ingestors/support/table.py index 41f3215426..88767c9cd8 100644 --- a/services/ingest-file/ingestors/support/table.py +++ b/services/ingest-file/ingestors/support/table.py @@ -25,10 +25,12 @@ def emit_row_dicts(self, table, rows, headers=None): csv_writer.writerow(values) self.manager.emit_text_fragment(table, values, row_count) row_count += 1 + if row_count > 0 and row_count % 1000 == 0: + log.info("Table emit [%s]: %s...", table, row_count) if row_count > 0: csv_hash = self.manager.store(csv_path, mime_type=CSV) table.set('csvHash', csv_hash) - table.set('rowCount', row_count + 1) + table.set('rowCount', row_count) table.set('columns', registry.json.pack(headers)) def wrap_row_tuples(self, rows): diff --git a/services/ingest-file/requirements.txt b/services/ingest-file/requirements.txt index cd1ed74321..8a3bb54793 100644 --- a/services/ingest-file/requirements.txt +++ b/services/ingest-file/requirements.txt @@ -3,7 +3,7 @@ normality==2.0.0 pantomime==0.3.3 servicelayer[google,amazon]==1.9.0 balkhash[leveldb,sql]==1.1.1 -followthemoney==1.21.5 +followthemoney==1.22.0 languagecodes==1.0.5 psycopg2-binary==2.8.4 pyicu==2.3.1 diff --git a/ui/package.json b/ui/package.json index aaa84b9f06..9be944a625 100644 --- a/ui/package.json +++ b/ui/package.json @@ -3,7 +3,7 @@ "version": "3.3.4", "private": true, "dependencies": { - "@alephdata/followthemoney": "^1.21.5", + "@alephdata/followthemoney": "^1.22.0", "@blueprintjs/core": "^3.18.1", "@blueprintjs/icons": "3.11.0", "@blueprintjs/select": "^3.8.0", diff --git a/ui/src/viewers/CsvStreamViewer.jsx b/ui/src/viewers/CsvStreamViewer.jsx index 9bf22b2a73..147b7a17b7 100644 --- a/ui/src/viewers/CsvStreamViewer.jsx +++ b/ui/src/viewers/CsvStreamViewer.jsx @@ -93,15 +93,20 @@ class CSVStreamViewer extends React.Component { render() { const { document } = this.props; + const { rows } = this.state; if (document.id === undefined) { return null; } + const numRows = parseInt(document.getFirst('rowCount'), 10); const columnsJson = document.getFirst('columns'); - const columns = columnsJson ? JSON.parse(columnsJson) : []; + const columnsFtm = columnsJson ? JSON.parse(columnsJson) : []; + // HACK: Use the first row of the data as headers if nothing is in the + // FtM metadata. + const columns = columnsFtm.length || (rows.length > 0 ? rows[0] : []); return (