Skip to content

Commit

Permalink
flows: fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kpsherva authored and zzacharo committed Jan 10, 2022
1 parent 8ece7a7 commit 08b6f04
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 148 deletions.
7 changes: 6 additions & 1 deletion cds/modules/flows/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def json(self):
"""Get flow status."""
if self.model is None:
return None

res = self.model.to_dict()
res.update(
{'tasks': [t.to_dict() for t in self.model.tasks]}
Expand Down Expand Up @@ -253,6 +252,12 @@ def start(self):
self.assemble(self.build)
return self._canvas.apply_async()

def delete(self):
"""Mark the flow as deleted."""
self.response = {'status': 410, 'message': 'Gone.'}
self.response_code = 410
db.session.commit()

def stop(self):
"""Stop the flow."""
for task in self.model.tasks:
Expand Down
26 changes: 26 additions & 0 deletions cds/modules/webhooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2021 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""CDS Webhooks."""

from __future__ import absolute_import, print_function

from .ext import CDSFlows

__all__ = ('CDSFlows', )
15 changes: 13 additions & 2 deletions cds/modules/webhooks/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from cds.modules.flows.api import Flow
from cds.modules.webhooks.errors import ReceiverDoesNotExist, InvalidPayload, \
WebhooksError
from cds.modules.webhooks.proxies import current_flows
from cds.modules.webhooks.receivers import AVCWorkflow
from flask_login import current_user
from flask import request, jsonify
from flask import request, jsonify, current_app


def pass_user_id(f):
Expand All @@ -27,7 +28,7 @@ def inner(self, receiver_id=None, *args, **kwargs):


def pass_flow(f):
"""Decorator to retrieve event."""
"""Decorator to retrieve flow."""
@wraps(f)
def inner(self, receiver_id=None, flow_id=None, *args, **kwargs):
flow = FlowModel.query.filter_by(
Expand All @@ -39,6 +40,16 @@ def inner(self, receiver_id=None, flow_id=None, *args, **kwargs):
return inner


def pass_receiver(f):
"""Decorator to retrieve flow controler."""
@wraps(f)
def inner(self, receiver_id=None, *args, **kwargs):
receiver = current_flows.receivers[receiver_id]
kwargs.update(receiver=receiver)
return f(self, *args, **kwargs)
return inner


def need_receiver_permission(action_name):
"""Decorator for actions on receivers.
Expand Down
50 changes: 50 additions & 0 deletions cds/modules/webhooks/ext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2021 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""CDS Fixtures."""

from __future__ import absolute_import, print_function
import pkg_resources


class CDSFlows(object):
"""CDS fixtures extension."""

def __init__(self, app=None, entry_point_group="cds_flows.receivers"):
"""Extension initialization."""
self.receivers = {}

if entry_point_group:
self.load_entry_point_group(entry_point_group)
if app:
self.init_app(app)

def init_app(self, app):
"""Flask application initialization."""
app.extensions['cds-flows'] = self

def load_entry_point_group(self, entry_point_group):
"""Load actions from an entry point group."""
for ep in pkg_resources.iter_entry_points(group=entry_point_group):
self.register(ep.name, ep.load())

def register(self, receiver_id, receiver):
"""Register a receiver."""
assert receiver_id not in self.receivers
self.receivers[receiver_id] = receiver(receiver_id)
32 changes: 32 additions & 0 deletions cds/modules/webhooks/proxies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2021 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Helper proxy to the state object."""

from flask import current_app
from werkzeug.local import LocalProxy

current_flows = LocalProxy(
lambda: current_app.extensions['cds-flows']
)
13 changes: 11 additions & 2 deletions cds/modules/webhooks/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def _update_event_bucket(flow):
class CeleryAsyncReceiver(object):
"""Celery Async Receiver abstract class."""

def __init__(self, receiver_id):
assert self.receiver_id == receiver_id
super(CeleryAsyncReceiver, self).__init__()

def __call__(self):
"""Proxy to ``self.run`` method."""
return self.run()
Expand Down Expand Up @@ -133,9 +137,9 @@ class AVCWorkflow(CeleryAsyncReceiver):

receiver_id = 'avc'

def __init__(self, *args, **kwargs):
def __init__(self, receiver_id, *args, **kwargs):
"""Init."""
super(AVCWorkflow, self).__init__(*args, **kwargs)
super(AVCWorkflow, self).__init__(receiver_id, *args, **kwargs)
self._tasks = {
'file_video_metadata_extraction': ExtractMetadataTask,
'file_download': DownloadTask,
Expand Down Expand Up @@ -345,6 +349,11 @@ def run(self, deposit_id, user_id, version_id, bucket_id, key):
# 3. update event response
self._update_flow_response(flow=flow, version_id=version_id)

with db.session.begin_nested():
flow.response.update(_tasks=self.build_flow_json())
flag_modified(event, 'response')
flag_modified(event, 'response_headers')

# 4. persist everything
super(AVCWorkflow, self).persist(flow)

Expand Down
1 change: 0 additions & 1 deletion cds/modules/webhooks/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from celery import states
from collections import defaultdict

# from invenio_webhooks.models import Event
from ..flows.models import Flow as FlowModel, Status as FlowStatus
from ..flows.api import Flow

Expand Down
45 changes: 24 additions & 21 deletions cds/modules/webhooks/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Document Server.
# Copyright (C) 2016, 2017 CERN.
# Copyright (C) 2016, 2017, 2021 CERN.
#
# CERN Document Server is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -35,7 +35,7 @@
from invenio_oauth2server.models import Scope

from .decorators import pass_flow, pass_user_id, need_receiver_permission, \
error_handler
error_handler, pass_receiver

from .receivers import AVCWorkflow

Expand Down Expand Up @@ -65,15 +65,13 @@ def add_link_header(response, links):
})


def make_response(flow):
def make_response(flow, receiver):
"""Make a response from flow object."""
receiver = AVCWorkflow()
code, message = receiver.serialize_result(flow)
response = jsonify(**flow.response)
response.headers['X-Hub-Event'] = receiver.receiver_id
response = jsonify(message)
flow.response = message
db.session.commit()
response.headers['X-Hub-Delivery'] = flow.id
if message:
response.headers['X-Hub-Info'] = message
add_link_header(response, {'self': url_for(
'.flow_item', receiver_id=receiver.receiver_id, flow_id=flow.id,
_external=True
Expand Down Expand Up @@ -104,8 +102,9 @@ def put(self, user_id, receiver_id, flow, task_id):
@error_handler
@pass_user_id
@pass_flow
@pass_receiver
@need_receiver_permission('delete')
def delete(self, user_id, receiver_id, event, task_id):
def delete(self, user_id, receiver, receiver_id, event, task_id):
"""Handle DELETE request: stop and clean a task."""
# TODO not used?
return '', 400
Expand All @@ -119,10 +118,11 @@ class FlowFeedbackResource(MethodView):
@error_handler
@pass_user_id
@pass_flow
@pass_receiver
@need_receiver_permission('read')
def get(self, user_id, receiver_id, flow):
def get(self, user_id, receiver, receiver_id, flow):
"""Handle GET request: get more flow information."""
code, status = AVCWorkflow().serialize_result(flow)
code, status = receiver.serialize_result(flow)
return json.dumps(status), 200


Expand All @@ -133,10 +133,10 @@ class FlowListResource(MethodView):
@require_oauth_scopes('webhooks:event')
@error_handler
@pass_user_id
@pass_receiver
@need_receiver_permission('create')
def post(self, receiver_id, user_id):
def post(self, receiver_id, receiver, user_id):
"""Handle POST request."""
receiver = AVCWorkflow()
data = receiver.extract_payload()
assert data["deposit_id"]
assert data["version_id"]
Expand All @@ -149,9 +149,9 @@ def post(self, receiver_id, user_id):
bucket_id=data["bucket_id"]
)
db.session.commit()
return make_response(new_flow)
return make_response(new_flow, receiver)

def options(self, receiver_id):
def options(self, receiver_id, receiver):
"""Handle OPTIONS request."""
abort(405)

Expand All @@ -164,34 +164,37 @@ class FlowResource(MethodView):
@error_handler
@pass_user_id
@pass_flow
@pass_receiver
@need_receiver_permission('read')
def get(self, receiver_id, user_id, flow):
def get(self, receiver_id, receiver, user_id, flow):
"""Handle GET request - get flow status."""
return make_response(flow)
return make_response(flow, receiver)

@require_api_auth()
@require_oauth_scopes('webhooks:event')
@error_handler
@pass_user_id
@pass_flow
@pass_receiver
@need_receiver_permission('update')
def put(self, receiver_id, user_id, flow):
def put(self, receiver_id, receiver, user_id, flow):
"""Handle PUT request - restart flow."""
flow.start()
db.session.commit()
return make_response(flow)
return make_response(flow, receiver)

@require_api_auth()
@require_oauth_scopes('webhooks:event')
@error_handler
@pass_user_id
@pass_flow
@pass_receiver
@need_receiver_permission('delete')
def delete(self, receiver_id, user_id, flow):
def delete(self, receiver_id, receiver, user_id, flow):
"""Handle DELETE request."""
flow.delete()
db.session.commit()
return make_response(flow)
return make_response(flow, receiver)


task_item = TaskResource.as_view('task_item')
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
extends:
file: docker-services.yml
service: es
flower:
extends:
file: docker-services.yml
service: flower
# flower:
# extends:
# file: docker-services.yml
# service: flower
Loading

0 comments on commit 08b6f04

Please sign in to comment.