Skip to content

Commit

Permalink
[wip] test nested process workflow - fixed up to nested invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Nov 7, 2024
1 parent 19477d5 commit 5005873
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# YAML representation supported by WeaverClient
processDescription:
process:
id: EchoResultsTester
version: "1.0" # must be string, avoid interpretation as float
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
Expand Down
18 changes: 11 additions & 7 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ class WorkflowTestCase(WorkflowTestRunnerBase):
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_ECHO,
WorkflowProcesses.APP_ECHO_OPTIONAL,
WorkflowProcesses.APP_ECHO_RESULTS_TESTER,
WorkflowProcesses.APP_ECHO_SECRETS,
WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS,
WorkflowProcesses.APP_READ_FILE,
Expand Down Expand Up @@ -1628,21 +1629,24 @@ def test_workflow_optional_input_propagation(self):
data = out_file.read().strip()
assert data == "test-message", "output from workflow should match the default resolved from input omission"

@pytest.mark.oap_part3
def test_workflow_ad_hoc_nested_process(self):
passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS)
echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER)

workflow_exec = {
"process": passthrough_process_info.path,
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"message": {
"process": passthrough_process_info.path,
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"process": echo_result_process_info.path,
"inputs": {
"message": "test"
},
"outputs": {"output_data": {}}
"message": {
"process": f"{self.WEAVER_RESTAPI_URL}{echo_result_process_info.path}",
"inputs": {
"message": "test"
},
"outputs": {"output_data": {}}
}
},
"outputs": {"message": {}}
},
Expand Down
11 changes: 6 additions & 5 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
get_field,
ows2json_output_data
)
from weaver.processes.ogc_api_process import OGCAPIRemoteProcess
from weaver.processes.types import ProcessType
from weaver.processes.utils import get_process
from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.store.base import StoreJobs, StoreProcesses
from weaver.utils import (
Expand Down Expand Up @@ -184,7 +184,7 @@ def execute_process(task, job_id, wps_url, headers=None):
# prepare inputs
job.progress = JobProgress.GET_INPUTS
job.save_log(logger=task_logger, message="Fetching job input definitions.")
wps_inputs = parse_wps_inputs(wps_process, job, database=db)
wps_inputs = parse_wps_inputs(wps_process, job, container=db)

# prepare outputs
job.progress = JobProgress.GET_OUTPUTS
Expand Down Expand Up @@ -535,7 +535,7 @@ def log_and_save_update_status_handler(
db = get_db(container)
store = db.get_store(StoreJobs)

def log_and_update_status(message, progress, status, *_, **kwargs):
def log_and_update_status(message, progress=None, status=None, *_, **kwargs):
job.save_log(message=message, progress=progress, status=status, **kwargs)
store.update_job(job)
return log_and_update_status
Expand Down Expand Up @@ -619,7 +619,7 @@ def parse_wps_inputs(wps_process, job, container=None):
),
logger=LOGGER,
)
process = OGCAPIRemoteProcessBase(
process = OGCAPIRemoteProcess(
input_value,
proc_uri,
request=None,
Expand Down Expand Up @@ -675,7 +675,8 @@ def parse_wps_inputs(wps_process, job, container=None):
if input_data is None:
job_log_update_status_func(
message=f"Removing [{input_id}] data input from execution request, value was 'null'.",
logger=LOGGER, level=logging.WARNING,
logger=LOGGER,
level=logging.WARNING,
)
else:
wps_inputs.append((input_id, input_data))
Expand Down

0 comments on commit 5005873

Please sign in to comment.