Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/conductor/client/http/api/workflow_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3051,4 +3051,131 @@ def update_workflow_and_task_state_with_http_info(self, body, request_id, workfl
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def execute_workflow_with_return_strategy(self, body, name, version, **kwargs): # noqa: E501
"""Execute a workflow synchronously with reactive response # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.execute_workflow_with_return_strategy(body,name,version)
>>> result = thread.get()
:param async_req bool
:param StartWorkflowRequest body: (required)
:param str name: (required)
:param int version: (required)
:param str request_id:
:param str wait_until_task_ref:
:param int wait_for_seconds:
:param str consistency: DURABLE or EVENTUAL
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
:return: WorkflowRun
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.execute_workflow_with_return_strategy_with_http_info(body, name, version, **kwargs) # noqa: E501
else:
(data) = self.execute_workflow_with_return_strategy_with_http_info(body, name, version, **kwargs) # noqa: E501
return data

def execute_workflow_with_return_strategy_with_http_info(self, body, name, version, **kwargs): # noqa: E501
"""Execute a workflow synchronously with reactive response # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.execute_workflow_with_return_strategy_with_http_info(body, name, version, async_req=True)
>>> result = thread.get()
:param async_req bool
:param StartWorkflowRequest body: (required)
:param str name: (required)
:param int version: (required)
:param str request_id:
:param str wait_until_task_ref:
:param int wait_for_seconds:
:param str consistency: DURABLE or EVENTUAL
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
:return: WorkflowRun
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['body', 'name', 'version', 'request_id', 'wait_until_task_ref', 'wait_for_seconds', 'consistency',
'return_strategy', 'async_req', '_return_http_data_only', '_preload_content',
'_request_timeout'] # noqa: E501

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method execute_workflow" % key
)
params[key] = val
del params['kwargs']
# verify the required parameter 'body' is set
if ('body' not in params or
params['body'] is None):
raise ValueError("Missing the required parameter `body` when calling `execute_workflow`") # noqa: E501
# verify the required parameter 'name' is set
if ('name' not in params or
params['name'] is None):
raise ValueError("Missing the required parameter `name` when calling `execute_workflow`") # noqa: E501
# verify the required parameter 'version' is set
if ('version' not in params or
params['version'] is None):
raise ValueError("Missing the required parameter `version` when calling `execute_workflow`") # noqa: E501

collection_formats = {}

path_params = {}
if 'name' in params:
path_params['name'] = params['name'] # noqa: E501
if 'version' in params:
path_params['version'] = params['version'] # noqa: E501

query_params = []
if 'request_id' in params:
query_params.append(('requestId', params['request_id'])) # noqa: E501
if 'wait_until_task_ref' in params:
query_params.append(('waitUntilTaskRef', params['wait_until_task_ref'])) # noqa: E501
if 'wait_for_seconds' in params:
query_params.append(('waitForSeconds', params['wait_for_seconds'])) # noqa: E501
if 'consistency' in params:
query_params.append(('consistency', params['consistency'])) # noqa: E501
if 'return_strategy' in params:
query_params.append(('returnStrategy', params['return_strategy'])) # noqa: E501

header_params = {}

form_params = []
local_var_files = {}

body_params = None
if 'body' in params:
body_params = params['body']
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['application/json']) # noqa: E501

# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['application/json']) # noqa: E501

# Authentication setting
auth_settings = ['api_key'] # noqa: E501

return self.api_client.call_api(
'/workflow/execute/{name}/{version}', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='SignalResponse', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)
36 changes: 35 additions & 1 deletion src/conductor/client/orkes/orkes_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import SkipTaskRequest, WorkflowStatus, \
ScrollableSearchResultWorkflowSummary
ScrollableSearchResultWorkflowSummary, SignalResponse
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
Expand Down Expand Up @@ -59,6 +59,40 @@ def execute_workflow(
wait_for_seconds=wait_for_seconds,
)

def execute_workflow_with_return_strategy(
self,
start_workflow_request: StartWorkflowRequest,
request_id: str = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30,
consistency: Optional[str] = None,
return_strategy: Optional[str] = None
) -> SignalResponse:
"""Execute a workflow synchronously with optional reactive features
Args:
start_workflow_request: StartWorkflowRequest containing workflow details
request_id: Optional request ID for tracking
wait_until_task_ref: Wait until this task reference is reached
wait_for_seconds: How long to wait for completion (default 30)
consistency: Workflow consistency level - 'DURABLE' or 'SYNCHRONOUS' or 'REGION_DURABLE'
return_strategy: Return strategy - 'TARGET_WORKFLOW' or 'BLOCKING_WORKFLOW' or 'BLOCKING_TASK' or 'BLOCKING_TASK_INPUT'
Returns:
WorkflowRun: The workflow execution result
"""
if consistency is None:
consistency = 'DURABLE'
if return_strategy is None:
return_strategy = 'TARGET_WORKFLOW'

return self.workflowResourceApi.execute_workflow_with_return_strategy(body=start_workflow_request,
name=start_workflow_request.name,
version=start_workflow_request.version,
request_id=request_id,
wait_until_task_ref=wait_until_task_ref,
wait_for_seconds=wait_for_seconds,
consistency=consistency,
return_strategy=return_strategy)

def pause_workflow(self, workflow_id: str):
self.workflowResourceApi.pause_workflow(workflow_id)

Expand Down
15 changes: 15 additions & 0 deletions src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ def execute_workflow(self, request: StartWorkflowRequest, wait_until_task_ref: s
wait_for_seconds=wait_for_seconds,
)

def execute_workflow_with_return_strategy(self, request: StartWorkflowRequest, wait_until_task_ref: str = None,
wait_for_seconds: int = 10, request_id: str = None,
consistency: str = None,
return_strategy: str = None) -> SignalResponse:
"""Execute a workflow synchronously with optional reactive features"""
if request_id is None:
request_id = str(uuid.uuid4())

return self.workflow_client.execute_workflow_with_return_strategy(start_workflow_request=request,
request_id=request_id,
wait_until_task_ref=wait_until_task_ref,
wait_for_seconds=wait_for_seconds,
consistency=consistency,
return_strategy=return_strategy)

def execute(self, name: str, version: Optional[int] = None, workflow_input: Any = {},
wait_until_task_ref: str = None, wait_for_seconds: int = 10,
request_id: str = None, correlation_id: str = None, domain: str = None) -> WorkflowRun:
Expand Down
14 changes: 13 additions & 1 deletion src/conductor/client/workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional, List, Dict

from conductor.client.http.models import WorkflowRun, SkipTaskRequest, WorkflowStatus, \
ScrollableSearchResultWorkflowSummary
ScrollableSearchResultWorkflowSummary, SignalResponse
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
Expand Down Expand Up @@ -44,6 +44,18 @@ def execute_workflow(
) -> WorkflowRun:
pass

@abstractmethod
def execute_workflow_with_return_strategy(
self,
start_workflow_request: StartWorkflowRequest,
request_id: str = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30,
consistency: Optional[str] = None,
return_strategy: Optional[str] = None
) -> SignalResponse:
pass

@abstractmethod
def pause_workflow(self, workflow_id: str):
pass
Expand Down
Loading