diff --git a/tests/functional/application-packages/EchoResultsTester/deploy.yml b/tests/functional/application-packages/EchoResultsTester/deploy.yml index 6aad038e8..2fd8a87fe 100644 --- a/tests/functional/application-packages/EchoResultsTester/deploy.yml +++ b/tests/functional/application-packages/EchoResultsTester/deploy.yml @@ -1,6 +1,7 @@ # YAML representation supported by WeaverClient processDescription: process: + id: EchoResultsTester version: "1.0" # must be string, avoid interpretation as float executionUnit: # note: This does not work by itself! The test suite injects the file dynamically. diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 35e9a2d62..48db775aa 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -1091,6 +1091,7 @@ class WorkflowTestCase(WorkflowTestRunnerBase): WorkflowProcesses.APP_DOCKER_STAGE_IMAGES, WorkflowProcesses.APP_ECHO, WorkflowProcesses.APP_ECHO_OPTIONAL, + WorkflowProcesses.APP_ECHO_RESULTS_TESTER, WorkflowProcesses.APP_ECHO_SECRETS, WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS, WorkflowProcesses.APP_READ_FILE, @@ -1628,21 +1629,24 @@ def test_workflow_optional_input_propagation(self): data = out_file.read().strip() assert data == "test-message", "output from workflow should match the default resolved from input omission" + @pytest.mark.oap_part3 def test_workflow_ad_hoc_nested_process(self): passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS) echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER) workflow_exec = { - "process": passthrough_process_info.path, + "process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}", "inputs": { "message": { - "process": passthrough_process_info.path, + "process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}", "inputs": { - "process": echo_result_process_info.path, - "inputs": { - "message": "test" - }, - "outputs": {"output_data": {}} + "message": { + "process": f"{self.WEAVER_RESTAPI_URL}{echo_result_process_info.path}", + "inputs": { + "message": "test" + }, + "outputs": {"output_data": {}} + } }, "outputs": {"message": {}} }, diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 0b8cb2fe3..01a4e50a6 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -46,9 +46,9 @@ get_field, ows2json_output_data ) +from weaver.processes.ogc_api_process import OGCAPIRemoteProcess from weaver.processes.types import ProcessType from weaver.processes.utils import get_process -from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( @@ -184,7 +184,7 @@ def execute_process(task, job_id, wps_url, headers=None): # prepare inputs job.progress = JobProgress.GET_INPUTS job.save_log(logger=task_logger, message="Fetching job input definitions.") - wps_inputs = parse_wps_inputs(wps_process, job, database=db) + wps_inputs = parse_wps_inputs(wps_process, job, container=db) # prepare outputs job.progress = JobProgress.GET_OUTPUTS @@ -535,7 +535,7 @@ def log_and_save_update_status_handler( db = get_db(container) store = db.get_store(StoreJobs) - def log_and_update_status(message, progress, status, *_, **kwargs): + def log_and_update_status(message, progress=None, status=None, *_, **kwargs): job.save_log(message=message, progress=progress, status=status, **kwargs) store.update_job(job) return log_and_update_status @@ -619,7 +619,7 @@ def parse_wps_inputs(wps_process, job, container=None): ), logger=LOGGER, ) - process = OGCAPIRemoteProcessBase( + process = OGCAPIRemoteProcess( input_value, proc_uri, request=None, @@ -675,7 +675,8 @@ def parse_wps_inputs(wps_process, job, container=None): if input_data is None: job_log_update_status_func( message=f"Removing [{input_id}] data input from execution request, value was 'null'.", - logger=LOGGER, level=logging.WARNING, + logger=LOGGER, + level=logging.WARNING, ) else: wps_inputs.append((input_id, input_data))