Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agents: add MEF synchro. #3170

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
agents: add MEF synchronization
* Adds a celery tasks to perform synchronization and contributions
  cleaning.
* Adds a new `RERO_ILS_MEF_SYNC_LOG_DIR` configuration to specify the
  logs location.
* Fixes MEF url configuration for the dojson transformation.
* Returns the most recent contribution in the `get_contribution` method.
* Adds a new parameter for the contribution `update_online` to avoid
  documents indexing.
* Adds a command line interface to perform manual synchronization.
* Refactors the contribution and subject dumpers for document indexing.
* Raises an exception if the contribution $ref cannot be resolved in
  document.
* Updates `rero-invenio-base` to be able to use custom migration scripts
  on workers.

Co-Authored-by: Johnny Mariéthoz <Johnny.Mariethoz@rero.ch>
  • Loading branch information
jma committed Feb 1, 2023
commit bf9363a5bb37245987d90f0efffe2553fdf81b67
1 change: 1 addition & 0 deletions .github/workflows/continuous-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
- name: Bootstrap deploy
if: ${{ matrix.dependencies == 'deploy' }}
run: |
poetry run pip list
poetry run ./scripts/bootstrap --ci --deploy E2E=yes
poetry install --no-root --extras sip2

Expand Down
12 changes: 7 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pytest-invenio = ">=1.4.0,<1.4.12"
sentry-sdk = "<1.6.1"
dparse = ">=0.5.2"
Mako = ">=1.2.2"
rero-invenio-base = "^0.1.0"
rero-invenio-base = "^0.2.0"
jsonref = "<1.0.0"
dojson = "^1.4.0"
jsonresolver = "<0.3.2"
Expand Down Expand Up @@ -199,6 +199,7 @@ users = "rero_ils.modules.users.views:blueprint"
apiharvester = "rero_ils.modules.apiharvester.tasks"
collections = "rero_ils.modules.collections.tasks"
documents = "rero_ils.modules.documents.tasks"
contributions = "rero_ils.modules.contributions.tasks"
ebooks = "rero_ils.modules.ebooks.tasks"
holdings = "rero_ils.modules.holdings.tasks"
items = "rero_ils.modules.items.tasks"
Expand Down
9 changes: 9 additions & 0 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ def _(x):
'schedule': crontab(minute=0, hour=5), # Every day at 05:00 UTC,
'enabled': False,
},
'sync-agents': {
'task': 'rero_ils.modules.contributions.tasks.sync_agents',
'schedule': crontab(minute=0, hour=1), # Every day at 01:00 UTC,
'enabled': False,
},
# 'mef-harvester': {
# 'task': 'rero_ils.modules.apiharvester.tasks.harvest_records',
# 'schedule': timedelta(minutes=60),
Expand Down Expand Up @@ -2952,6 +2957,10 @@ def _(x):
RERO_ILS_MEF_AGENTS_URL = 'https://mef.rero.ch/api/agents'
RERO_ILS_MEF_RESULT_SIZE = 100

# The absolute path to put the agent synchronization logs, default is the
# instance path
# RERO_ILS_MEF_SYNC_LOG_DIR = '/var/logs/reroils'

RERO_ILS_APP_HELP_PAGE = (
'https://github.com/rero/rero-ils/wiki/Public-demo-help'
)
Expand Down
5 changes: 3 additions & 2 deletions rero_ils/dojson/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,9 @@ def get_contribution_link(bibid, reroid, id, key):
# In dojson we dont have app. mef_url should be the same as
# RERO_ILS_MEF_AGENTS_URL in config.py
# https://mef.test.rero.ch/api/agents/mef/?q=rero.rero_pid:A012327677
mef_host = os.environ.get('RERO_ILS_MEF_HOST', 'mef.rero.ch')
mef_url = f'https://{mef_host}/api/agents'
mef_url = os.environ.get(
'RERO_ILS_MEF_AGENTS_URL',
'https://mef.rero.ch/api/agents')
if type(id) is str:
match = re_identified.search(id)
else:
Expand Down
3 changes: 2 additions & 1 deletion rero_ils/modules/cli/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def fixtures():
@click.option('-r', '--reindex', 'reindex', is_flag=True, default=False)
@click.option('-c', '--dbcommit', 'dbcommit', is_flag=True, default=False)
@click.option('-C', '--commit', 'commit', default=100000)
@click.option('-v', '--verbose', 'verbose', is_flag=True, default=True)
@click.option('-v', '--verbose/--no-verbose', 'verbose',
is_flag=True, default=True)
@click.option('-d', '--debug', 'debug', is_flag=True, default=False)
@click.option('-s', '--schema', 'schema', default=None)
@click.option('-t', '--pid_type', 'pid_type', default=None)
Expand Down
49 changes: 35 additions & 14 deletions rero_ils/modules/contributions/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ def get_contribution(cls, ref_type, ref_pid):
"""Get contribution."""
if ref_type == 'mef':
return cls.get_record_by_pid(ref_pid)

es_filter = Q({'term': {f'{ref_type}.pid': ref_pid}})
if ref_type == 'viaf':
query = ContributionsSearch() \
.filter('term', viaf_pid=ref_pid)
else:
query = ContributionsSearch() \
.filter({'term': {f'{ref_type}.pid': ref_pid}})
es_filter = Q('term', viaf_pid=ref_pid)

# in case of multiple results get the more recent
query = ContributionsSearch() \
.params(preserve_order=True) \
.sort({'_created': {'order': 'desc'}})\
.filter(es_filter)

with contextlib.suppress(StopIteration):
pid = next(query.source('pid').scan()).pid
return cls.get_record_by_pid(pid)
Expand Down Expand Up @@ -134,6 +139,25 @@ def get_record_by_ref(cls, ref):
contribution = None
return contribution, online

@classmethod
def remove_schema(cls, data):
"""Removes in place the $schema values.

Removes the root and the sources $schema.

:param data - dict: the data representation of the current
contribution.
:returns: the modified data.
:rtype: dict.
"""
data.pop('$schema', None)
sources = current_app.config.get(
'RERO_ILS_CONTRIBUTIONS_SOURCES', [])
for source in sources:
if source in data:
data[source].pop('$schema', None)
return data

@classmethod
def _get_mef_data_by_type(cls, pid, pid_type, verbose=False,
with_deleted=True, resolve=True, sources=True):
Expand Down Expand Up @@ -162,13 +186,7 @@ def _get_mef_data_by_type(cls, pid, pid_type, verbose=False,
if status == requests_codes.ok:
try:
data = request.json().get('hits', {}).get('hits', [None])[0]
metadata = data['metadata']
metadata.pop('$schema', None)
sources = current_app.config.get(
'RERO_ILS_CONTRIBUTIONS_SOURCES', [])
for source in sources:
if source in metadata:
metadata[source].pop('$schema', None)
metadata = cls.remove_schema(data['metadata'])
return metadata
except Exception:
msg = f'MEF resolver no metadata: {mef_url}'
Expand Down Expand Up @@ -285,7 +303,10 @@ def documents_ids(self, with_subjects=True, with_subjects_imported=True):
).source('pid')
return [hit.meta.id for hit in search.scan()]

def update_online(self, dbcommit=False, reindex=False, verbose=False):
def update_online(
self, dbcommit=False, reindex=False, verbose=False,
reindex_doc=True
):
"""Update record online.

:param reindex: reindex record by record
Expand Down Expand Up @@ -314,7 +335,7 @@ def update_online(self, dbcommit=False, reindex=False, verbose=False):
elif dict(self) != data:
action = ContributionUpdateAction.REPLACE
self.replace(data=data, dbcommit=dbcommit, reindex=reindex)
if reindex:
if reindex and reindex_doc:
indexer = DocumentsIndexer()
indexer.bulk_index(self.documents_ids())
indexer.process_bulk_queue()
Expand Down
81 changes: 81 additions & 0 deletions rero_ils/modules/contributions/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import click
from flask.cli import with_appcontext

from .sync import SyncAgent
from ..documents.tasks import \
replace_idby_contribution as task_replace_idby_contribution
from ..documents.tasks import \
Expand Down Expand Up @@ -90,3 +91,83 @@ def replace_idby_subjects_imported(verbose, debug, details):
debug=debug,
subjects='subjects_imported'
)


@contribution.command()
@click.option('-q', '--query', default='*')
@click.option('-n', '--dry-run', is_flag=True, default=False)
@click.option('-d', '--from-last-date', is_flag=True, default=False)
@click.option('-v', '--verbose', count=True, default=0)
@click.option('-l', '--log-dir', default=None)
@click.option('-f', '--from-date',
type=click.DateTime(formats=["%Y-%m-%d"]), default=None)
@click.option('-m', '--in-memory', is_flag=True, default=False)
@with_appcontext
def sync(query, dry_run, from_last_date, verbose, log_dir, from_date,
in_memory):
"""Find and replace identifiedBy subjects imported."""
a = SyncAgent(
dry_run=dry_run, verbose=verbose, log_dir=log_dir,
from_last_date=from_last_date)
if verbose:
a.sync(query, from_date)
else:
a.start_sync()
pids, total = a.get_contributions_pids(query, from_date)
if in_memory:
pids = list(pids)
n_updated = 0
doc_updated = set()
err_pids = []
with click.progressbar(pids, length=total) as bar:
for pid in bar:
current_doc_updated, updated, error = a.sync_record(pid)
doc_updated.update(current_doc_updated)
if updated:
n_updated += 1
if error:
err_pids.append(pid)
n_doc_updated = len(doc_updated)
a.end_sync(n_doc_updated, n_updated, err_pids)
if err_pids:
click.secho(f'ERROR: MEF pids: {err_pids}', fg='red')


@contribution.command()
@click.option('-q', '--query', default='*')
@click.option('-n', '--dry-run', is_flag=True, default=False)
@click.option('-v', '--verbose', count=True, default=0)
@click.option('-l', '--log-dir', default=None)
@with_appcontext
def clean(query, dry_run, verbose, log_dir):
"""Find and replace identifiedBy subjects imported."""
a = SyncAgent(dry_run=dry_run, verbose=verbose, log_dir=log_dir)
if verbose:
a.remove_unused(query)
else:
a.start_clean()
pids, total = a.get_contributions_pids(query)
n_removed = 0
err_pids = []
with click.progressbar(pids, length=total) as bar:
for pid in bar:
updated, error = a.remove_unused_record(pid)
if updated:
n_removed += 1
if error:
err_pids.append(pid)

click.secho(f'{n_removed} removed MEF records', fg='green')
if err_pids:
click.secho(f'ERROR: MEF pids: {err_pids}', fg='red')


@contribution.command()
@click.option('-c', '--clear', is_flag=True, default=False)
@with_appcontext
def sync_errors(clear):
"""Find and replace identifiedBy subjects imported."""
errors = SyncAgent.get_errors()
if clear:
SyncAgent.clear_errors()
click.secho(f'Removed {len(errors)} errors', fg='yellow')
Loading