Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a0fb309
Added autogenerated async client
IgorChvyrov-sm Aug 5, 2025
f53be3f
Added base model adapters
IgorChvyrov-sm Aug 5, 2025
6f77bbb
Updated ruff rules to ignore autogenerated code
IgorChvyrov-sm Aug 5, 2025
81e9fa3
Added base api client adapters
IgorChvyrov-sm Aug 5, 2025
9d6c8a9
Fix imports
IgorChvyrov-sm Aug 5, 2025
883829d
Regenerated api clients to remove /api/
IgorChvyrov-sm Aug 5, 2025
b1a5ef3
Refactor imports
IgorChvyrov-sm Aug 5, 2025
491199b
Implemented Orkes clients
IgorChvyrov-sm Aug 6, 2025
48edf01
Refactoring: run ruff linter
IgorChvyrov-sm Aug 6, 2025
fb2ac48
Added Configuration adapter
IgorChvyrov-sm Aug 6, 2025
6bbe71f
Refactoring: replaced generated Config with adapter
IgorChvyrov-sm Aug 6, 2025
eaaafbe
Refactoring: code cleanup
IgorChvyrov-sm Aug 6, 2025
bf62283
Revert accidental changes in sync client
IgorChvyrov-sm Aug 6, 2025
13bf3f8
Refactoring: replaced API with APIAdapters in OrkesBaseClient
IgorChvyrov-sm Aug 7, 2025
bfd410a
Added async scheduler, schema, secret, task and workflow clients tests
IgorChvyrov-sm Aug 7, 2025
15a3a24
Added async authorization, integration, metadata, prompt clients test…
IgorChvyrov-sm Aug 7, 2025
4c57b88
Models refactoring pt.1
IgorChvyrov-sm Aug 8, 2025
a4f3124
Models refactoring pt.2
IgorChvyrov-sm Aug 8, 2025
eb7c58c
Models refactoring pt.3
IgorChvyrov-sm Aug 9, 2025
17a5520
Models refactoring pt.4
IgorChvyrov-sm Aug 10, 2025
7ff26b3
Models refactoring pt.5
IgorChvyrov-sm Aug 10, 2025
4d5ee40
Make pydatic test package blank
IgorChvyrov-sm Aug 10, 2025
07e0a81
Implementing test suite for Pydantic model adapters pt.1
IgorChvyrov-sm Aug 10, 2025
cd1a8c7
Models tests
IgorChvyrov-sm Aug 10, 2025
cb9fc88
Config refactoring
IgorChvyrov-sm Aug 11, 2025
29e63e3
Revert "Config refactoring"
IgorChvyrov-sm Aug 11, 2025
99e72c6
Async SDK Clients for Orkes Conductor (#322)
IgorChvyrov-sm Aug 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
15 changes: 10 additions & 5 deletions docs/authorization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,10 @@ authorization_client.remove_user_from_group(group_id, user_id)
Grants a set of accesses to the specified Subject for a given Target.

```python
from conductor.client.http.models.target_ref import TargetRef, TargetType
from conductor.client.http.models.subject_ref import SubjectRef, SubjectType
from conductor.client.http.models.target_ref import TargetRef
from conductor.shared.http.enums.target_type import TargetType
from conductor.client.http.models.subject_ref import SubjectRef
from conductor.shared.http.enums.subject_type import SubjectType
from conductor.client.orkes.models.access_type import AccessType

target = TargetRef(TargetType.WORKFLOW_DEF, "TEST_WORKFLOW")
Expand All @@ -245,7 +247,8 @@ Given the target, returns all permissions associated with it as a Dict[str, List
In the returned dictionary, key is AccessType and value is a list of subjects.

```python
from conductor.client.http.models.target_ref import TargetRef, TargetType
from conductor.client.http.models.target_ref import TargetRef
from conductor.shared.http.enums.target_type import TargetType

target = TargetRef(TargetType.WORKFLOW_DEF, WORKFLOW_NAME)
target_permissions = authorization_client.get_permissions(target)
Expand Down Expand Up @@ -273,8 +276,10 @@ user_permissions = authorization_client.get_granted_permissions_for_user(user_id
Removes a set of accesses from a specified Subject for a given Target.

```python
from conductor.client.http.models.target_ref import TargetRef, TargetType
from conductor.client.http.models.subject_ref import SubjectRef, SubjectType
from conductor.client.http.models.target_ref import TargetRef
from conductor.shared.http.enums.target_type import TargetType
from conductor.client.http.models.subject_ref import SubjectRef
from conductor.shared.http.enums.subject_type import SubjectType
from conductor.client.orkes.models.access_type import AccessType

target = TargetRef(TargetType.WORKFLOW_DEF, "TEST_WORKFLOW")
Expand Down
2 changes: 1 addition & 1 deletion docs/metadata/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ In order to define a workflow, you must provide a `MetadataClient` and a `Workfl

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClie
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
Expand Down
3 changes: 2 additions & 1 deletion docs/schedule/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Scheduler Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_scheduler_client import OrkesSchedulerClient

configuration = Configuration(
Expand Down
3 changes: 2 additions & 1 deletion docs/secret/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Secret Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_secret_client import OrkesSecretClient

configuration = Configuration(
Expand Down
3 changes: 2 additions & 1 deletion docs/task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Task Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_task_client import OrkesTaskClient

configuration = Configuration(
Expand Down
2 changes: 1 addition & 1 deletion docs/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A sample unit test code snippet is provided below.

```python
import json
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
Expand Down
10 changes: 6 additions & 4 deletions docs/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ Quick example below:

```python
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus


def execute(task: Task) -> TaskResult:
task_result = TaskResult(
Expand All @@ -59,7 +60,7 @@ The class must implement `WorkerInterface` class, which requires an `execute` me

```python
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface

class SimplePythonWorker(WorkerInterface):
Expand Down Expand Up @@ -99,13 +100,14 @@ def python_annotated_task(input) -> object:
Now you can run your workers by calling a `TaskHandler`, example:

```python
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.configuration import Configuration
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.worker.worker import Worker

#### Add these lines if running on a mac####
from multiprocessing import set_start_method

set_start_method('fork')
############################################

Expand Down Expand Up @@ -347,7 +349,7 @@ and [simple_cpp_worker.py](src/example/worker/cpp/simple_cpp_worker.py) for comp
```python
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from ctypes import cdll

Expand Down
3 changes: 2 additions & 1 deletion docs/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Workflow Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient

configuration = Configuration(
Expand Down
File renamed without changes.
70 changes: 70 additions & 0 deletions examples/async/dynamic_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
This is a dynamic workflow that can be created and executed at run time.
dynamic_workflow will run worker tasks get_user_email and send_email in the same order.
For use cases in which the workflow cannot be defined statically, dynamic workflows is a useful approach.
For detailed explanation, https://github.com/conductor-sdk/conductor-python/blob/main/workflows.md
"""

import asyncio

from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration.configuration import Configuration
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
from conductor.asyncio_client.worker.worker_task import worker_task
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow


@worker_task(task_definition_name="get_user_email")
def get_user_email(userid: str) -> str:
return f"{userid}@example.com"


@worker_task(task_definition_name="send_email")
def send_email(email: str, subject: str, body: str):
print(f"sending email to {email} with subject {subject} and body {body}")


async def main():
# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

async with ApiClient(api_config) as api_client:
clients = OrkesClients(api_client=api_client, configuration=api_config)
workflow_executor = clients.get_workflow_executor()
workflow = AsyncConductorWorkflow(
name="dynamic_workflow", version=1, executor=workflow_executor
)
get_email = get_user_email(
task_ref_name="get_user_email_ref", userid=workflow.input("userid")
)
sendmail = send_email(
task_ref_name="send_email_ref",
email=get_email.output("result"),
subject="Hello from Orkes",
body="Test Email",
)

workflow >> get_email >> sendmail

# Configure the output of the workflow
workflow.output_parameters(
output_parameters={"email": get_email.output("result")}
)

workflow_run = await workflow.execute(workflow_input={"userid": "user_a"})
print(f"\nworkflow output: {workflow_run.output}\n")
print(
f"check the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}"
)

task_handler.stop_processes()


if __name__ == "__main__":
asyncio.run(main())
11 changes: 11 additions & 0 deletions examples/async/helloworld/greetings_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
This file contains a Simple Worker that can be used in any workflow.
For detailed information https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-2-write-worker
"""

from conductor.asyncio_client.worker.worker_task import worker_task


@worker_task(task_definition_name="greet")
def greet(name: str) -> str:
return f"Hello {name}"
17 changes: 17 additions & 0 deletions examples/async/helloworld/greetings_workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "greetings",
"description": "Sample greetings workflow",
"version": 1,
"tasks": [
{
"name": "greet",
"taskReferenceName": "greet_ref",
"type": "SIMPLE",
"inputParameters": {
"name": "${workflow.input.name}"
}
}
],
"timeoutPolicy": "TIME_OUT_WF",
"timeoutSeconds": 60
}
20 changes: 20 additions & 0 deletions examples/async/helloworld/greetings_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
For detailed explanation https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-1-create-a-workflow
"""

from greetings_worker import greet

from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.asyncio_client.workflow.executor.workflow_executor import (
AsyncWorkflowExecutor,
)


def greetings_workflow(
workflow_executor: AsyncWorkflowExecutor,
) -> AsyncConductorWorkflow:
name = "greetings"
workflow = AsyncConductorWorkflow(name=name, executor=workflow_executor)
workflow.version = 1
workflow >> greet(task_ref_name="greet_ref", name=workflow.input("name"))
return workflow
50 changes: 50 additions & 0 deletions examples/async/helloworld/helloworld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio

from greetings_workflow import greetings_workflow

from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration import Configuration
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.asyncio_client.workflow.executor.workflow_executor import (
AsyncWorkflowExecutor,
)


async def register_workflow(
workflow_executor: AsyncWorkflowExecutor,
) -> AsyncConductorWorkflow:
workflow = greetings_workflow(workflow_executor=workflow_executor)
await workflow.register(True)
return workflow


async def main():
# points to http://localhost:8080/api by default
api_config = Configuration()
async with ApiClient(api_config) as api_client:
workflow_executor = AsyncWorkflowExecutor(
configuration=api_config, api_client=api_client
)
# Needs to be done only when registering a workflow one-time
workflow = await register_workflow(workflow_executor)

task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

workflow_run = await workflow_executor.execute(
name=workflow.name,
version=workflow.version,
workflow_input={"name": "World"},
)

print(f"\nworkflow result: {workflow_run.output}\n")
print(
f"see the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}\n"
)

task_handler.stop_processes()


if __name__ == "__main__":
asyncio.run(main())
Loading