Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
dfaa8fc
Refactor: move AuthenticationSettings MetricsSettings to shared
IgorChvyrov-sm Aug 11, 2025
ddeb1b6
Added telemetry client
IgorChvyrov-sm Aug 12, 2025
25cd24c
Added event client
IgorChvyrov-sm Aug 12, 2025
4d4fdd3
Added worflow client
IgorChvyrov-sm Aug 12, 2025
ccef7b0
Added ai automator worker clients
IgorChvyrov-sm Aug 12, 2025
07b19d0
Refactor: imports
IgorChvyrov-sm Aug 12, 2025
df95dd9
Refactor: ruff fixes
IgorChvyrov-sm Aug 12, 2025
5edc05c
Refactor: linting and formatting
IgorChvyrov-sm Aug 12, 2025
8411762
Refactor: imports
IgorChvyrov-sm Aug 12, 2025
fd825b4
Merge branch 'feature_async_client' into feature_async_client_pt2
IgorChvyrov-sm Aug 12, 2025
2e346e1
Tests: automator, kafka_publish input
IgorChvyrov-sm Aug 12, 2025
b280138
Test: add event client test
IgorChvyrov-sm Aug 12, 2025
8751122
Test: add ai orchestrator test
IgorChvyrov-sm Aug 12, 2025
7d13871
Test: add workflow tests
IgorChvyrov-sm Aug 12, 2025
91b8a10
Test: metrics collector test
IgorChvyrov-sm Aug 12, 2025
46e2153
Test: event and worker tests
IgorChvyrov-sm Aug 12, 2025
a6775bb
Fix imports
IgorChvyrov-sm Aug 13, 2025
ebde4df
Fix tests
IgorChvyrov-sm Aug 13, 2025
81abca1
Fixed dynamic workflow example (resource api manual change)
IgorChvyrov-sm Aug 14, 2025
bdc65fb
Fixing circular imports pt.1
IgorChvyrov-sm Aug 14, 2025
81ac744
Fixing circular imports pt.2
IgorChvyrov-sm Aug 14, 2025
6d1d34e
Fixing circular imports pt.3
IgorChvyrov-sm Aug 14, 2025
e1e8c5d
Refactor: Update Orkes clients to take api_client as argument
IgorChvyrov-sm Aug 14, 2025
a943651
Refactor: resolving annotations
IgorChvyrov-sm Aug 15, 2025
bf63b20
Refactor: orkes clients
IgorChvyrov-sm Aug 15, 2025
18d7d96
Async helloworld example
IgorChvyrov-sm Aug 17, 2025
c8090a7
Async workflow examples
IgorChvyrov-sm Aug 18, 2025
40c306e
Added orkes examples
IgorChvyrov-sm Aug 18, 2025
3e9be6a
Added fixes and missed orkes examples
IgorChvyrov-sm Aug 18, 2025
8be7a34
Added missed from prev commit
IgorChvyrov-sm Aug 18, 2025
a210d3b
Introduced ApiClientAdapter
IgorChvyrov-sm Aug 19, 2025
f58956a
Revert manual api client changes and moved it to adapter
IgorChvyrov-sm Aug 19, 2025
d62e8c6
Added JWT obtaining logic
IgorChvyrov-sm Aug 19, 2025
b975f6c
Fixed tests and ruff linter errors
IgorChvyrov-sm Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion docs/metadata/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ In order to define a workflow, you must provide a `MetadataClient` and a `Workfl

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClie
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
Expand Down
3 changes: 2 additions & 1 deletion docs/schedule/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Scheduler Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_scheduler_client import OrkesSchedulerClient

configuration = Configuration(
Expand Down
3 changes: 2 additions & 1 deletion docs/secret/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Secret Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_secret_client import OrkesSecretClient

configuration = Configuration(
Expand Down
3 changes: 2 additions & 1 deletion docs/task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Task Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_task_client import OrkesTaskClient

configuration = Configuration(
Expand Down
2 changes: 1 addition & 1 deletion docs/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A sample unit test code snippet is provided below.

```python
import json
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
Expand Down
10 changes: 6 additions & 4 deletions docs/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ Quick example below:

```python
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus


def execute(task: Task) -> TaskResult:
task_result = TaskResult(
Expand All @@ -59,7 +60,7 @@ The class must implement `WorkerInterface` class, which requires an `execute` me

```python
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface

class SimplePythonWorker(WorkerInterface):
Expand Down Expand Up @@ -99,13 +100,14 @@ def python_annotated_task(input) -> object:
Now you can run your workers by calling a `TaskHandler`, example:

```python
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.configuration import Configuration
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.worker.worker import Worker

#### Add these lines if running on a mac####
from multiprocessing import set_start_method

set_start_method('fork')
############################################

Expand Down Expand Up @@ -347,7 +349,7 @@ and [simple_cpp_worker.py](src/example/worker/cpp/simple_cpp_worker.py) for comp
```python
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from ctypes import cdll

Expand Down
3 changes: 2 additions & 1 deletion docs/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## Workflow Client

### Initialization

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient

configuration = Configuration(
Expand Down
70 changes: 70 additions & 0 deletions examples/async/dynamic_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
This is a dynamic workflow that can be created and executed at run time.
dynamic_workflow will run worker tasks get_user_email and send_email in the same order.
For use cases in which the workflow cannot be defined statically, dynamic workflows is a useful approach.
For detailed explanation, https://github.com/conductor-sdk/conductor-python/blob/main/workflows.md
"""

import asyncio

from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration.configuration import Configuration
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
from conductor.asyncio_client.worker.worker_task import worker_task
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow


@worker_task(task_definition_name="get_user_email")
def get_user_email(userid: str) -> str:
return f"{userid}@example.com"


@worker_task(task_definition_name="send_email")
def send_email(email: str, subject: str, body: str):
print(f"sending email to {email} with subject {subject} and body {body}")


async def main():
# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

async with ApiClient(api_config) as api_client:
clients = OrkesClients(api_client=api_client, configuration=api_config)
workflow_executor = clients.get_workflow_executor()
workflow = AsyncConductorWorkflow(
name="dynamic_workflow", version=1, executor=workflow_executor
)
get_email = get_user_email(
task_ref_name="get_user_email_ref", userid=workflow.input("userid")
)
sendmail = send_email(
task_ref_name="send_email_ref",
email=get_email.output("result"),
subject="Hello from Orkes",
body="Test Email",
)

workflow >> get_email >> sendmail

# Configure the output of the workflow
workflow.output_parameters(
output_parameters={"email": get_email.output("result")}
)

workflow_run = await workflow.execute(workflow_input={"userid": "user_a"})
print(f"\nworkflow output: {workflow_run.output}\n")
print(
f"check the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}"
)

task_handler.stop_processes()


if __name__ == "__main__":
asyncio.run(main())
Empty file.
11 changes: 11 additions & 0 deletions examples/async/helloworld/greetings_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
This file contains a Simple Worker that can be used in any workflow.
For detailed information https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-2-write-worker
"""

from conductor.asyncio_client.worker.worker_task import worker_task


@worker_task(task_definition_name="greet")
def greet(name: str) -> str:
return f"Hello {name}"
17 changes: 17 additions & 0 deletions examples/async/helloworld/greetings_workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "greetings",
"description": "Sample greetings workflow",
"version": 1,
"tasks": [
{
"name": "greet",
"taskReferenceName": "greet_ref",
"type": "SIMPLE",
"inputParameters": {
"name": "${workflow.input.name}"
}
}
],
"timeoutPolicy": "TIME_OUT_WF",
"timeoutSeconds": 60
}
20 changes: 20 additions & 0 deletions examples/async/helloworld/greetings_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
For detailed explanation https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-1-create-a-workflow
"""

from greetings_worker import greet

from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.asyncio_client.workflow.executor.workflow_executor import (
AsyncWorkflowExecutor,
)


def greetings_workflow(
workflow_executor: AsyncWorkflowExecutor,
) -> AsyncConductorWorkflow:
name = "greetings"
workflow = AsyncConductorWorkflow(name=name, executor=workflow_executor)
workflow.version = 1
workflow >> greet(task_ref_name="greet_ref", name=workflow.input("name"))
return workflow
50 changes: 50 additions & 0 deletions examples/async/helloworld/helloworld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio

from greetings_workflow import greetings_workflow

from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration import Configuration
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.asyncio_client.workflow.executor.workflow_executor import (
AsyncWorkflowExecutor,
)


async def register_workflow(
workflow_executor: AsyncWorkflowExecutor,
) -> AsyncConductorWorkflow:
workflow = greetings_workflow(workflow_executor=workflow_executor)
await workflow.register(True)
return workflow


async def main():
# points to http://localhost:8080/api by default
api_config = Configuration()
async with ApiClient(api_config) as api_client:
workflow_executor = AsyncWorkflowExecutor(
configuration=api_config, api_client=api_client
)
# Needs to be done only when registering a workflow one-time
workflow = await register_workflow(workflow_executor)

task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

workflow_run = await workflow_executor.execute(
name=workflow.name,
version=workflow.version,
workflow_input={"name": "World"},
)

print(f"\nworkflow result: {workflow_run.output}\n")
print(
f"see the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}\n"
)

task_handler.stop_processes()


if __name__ == "__main__":
asyncio.run(main())
Loading