From c1b6e5d44d3e16155d9480203d84be01ae538295 Mon Sep 17 00:00:00 2001 From: c4lm Date: Fri, 30 Sep 2022 01:48:26 +0400 Subject: [PATCH] NOTASK - add terminate workflow task to the docs revert --- .../sdks/conductor-python/main/README.md | 180 +++++++++++++----- 1 file changed, 130 insertions(+), 50 deletions(-) diff --git a/docs/how-tos/sdks/conductor-python/main/README.md b/docs/how-tos/sdks/conductor-python/main/README.md index cf324c15..be074c7a 100644 --- a/docs/how-tos/sdks/conductor-python/main/README.md +++ b/docs/how-tos/sdks/conductor-python/main/README.md @@ -1,70 +1,150 @@ -# Netflix Conductor SDK +# Netflix Conductor Client SDK -`conductor-python` repository provides the client SDKs to build Task Workers in Python - -## Quick Start +To find out more about Conductor visit: [https://github.com/Netflix/conductor](https://github.com/Netflix/conductor) -1. [Setup conductor-python package](#Setup-conductor-python-package) -2. [Create and run Task Workers](docs/worker/README.md) -3. [Create workflows using Code](docs/workflow/README.md) +`conductor-python` repository provides the client SDKs to build Task Workers in Python -### Setup conductor python package +### Setup Virtual Environment -Create a virtual environment to build your package: ```shell -virtualenv conductor -source conductor/bin/activate +$ virtualenv conductor +$ source conductor/bin/activate ``` - -Get Conductor Python SDK +Install `conductor-python` package ```shell -python3 -m pip install conductor-python +$ python3 -m pip install conductor-python ``` -### Server settings -Everything related to server settings should be done within `Configuration` class, by setting the required parameter when initializing an object, like this: - +### Write a simple worker ```python -configuration = Configuration( - server_api_url='https://play.orkes.io/api', - debug=True -) -``` +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings +from conductor.client.http.models import Task, TaskResult +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.worker.worker_interface import WorkerInterface + + +class SimplePythonWorker(WorkerInterface): + def execute(self, task: Task) -> TaskResult: + task_result = self.get_task_result_from_task(task) + task_result.add_output_data('key1', 'value') + task_result.add_output_data('key2', 42) + task_result.add_output_data('key3', False) + task_result.status = TaskResultStatus.COMPLETED + return task_result + + def get_polling_interval_in_seconds(self) -> float: + return 1 + + +def main(): + # Point to the Conductor Server + configuration = Configuration( + server_api_url='https://play.orkes.io/api', + debug=True, + authentication_settings=AuthenticationSettings( # Optional if you are using a server that requires authentication + key_id='KEY', + key_secret='SECRET' + ) + ) -* server_api_url : Conductor server address. e.g. `http://localhost:8000/api` if running locally -* debug: `true` for verbose logging `false` to display only the errors + # Add three workers + workers = [ + SimplePythonWorker('python_task_example'), + ] -#### Authentication settings (optional) -Use if your conductor server requires authentication. + # Start the worker processes and wait for their completion + with TaskHandler(workers, configuration) as task_handler: + task_handler.start_processes() + task_handler.join_processes() -##### Access Control Setup -See [Access Control](https://orkes.io/content/docs/getting-started/concepts/access-control) for more details on role based access control with Conductor and generating API keys for your environment. +if __name__ == '__main__': + main() +``` -```python -configuration = Configuration( - authentication_settings=AuthenticationSettings( - key_id='key', - key_secret='secret' - ) -) +Start polling for the work + +```shell +python main.py ``` -### Metrics settings (optional) +See [Using Conductor Playground](https://orkes.io/content/docs/getting-started/playground/using-conductor-playground) +for more details on how to use Playground environment for testing. + + +## Worker Configurations +Worker configuration is handled via `Configuration` object passed when initializing `TaskHandler` + +### Server Configurations +* server_api_url : Conductor server address. e.g. `http://localhost:8000/api` if running locally +* debug: `true` for verbose logging `false` to display only the errors +* authentication_settings: see below +* metrics_settings: see below + +### Metrics Conductor uses [Prometheus](https://prometheus.io/) to collect metrics. +* directory: Directory where to store the metrics +* file_name: File where the metrics are colleted. e.g. `metrics.log` +* update_interval: Time interval in seconds at which to collect metrics into the file + +### Authentication +Use if your conductor server requires authentication +* key_id: Key +* key_secret: Secret for the Key + +## C/C++ Support +Python is great, but at times you need to call into native C/C++ code. +Here is an example how you can do that with Conductor SDK. + +### 1. Export your C++ functions as `extern "C"`: + * C++ function example (sum two integers) + ```cpp + #include + + extern "C" int32_t get_sum(const int32_t A, const int32_t B) { + return A + B; + } + ``` +### 2. Compile and share its library: + * C++ file name: `simple_cpp_lib.cpp` + * Library output name goal: `lib.so` + ```bash + $ g++ -c -fPIC simple_cpp_lib.cpp -o simple_cpp_lib.o + $ g++ -shared -Wl,-install_name,lib.so -o lib.so simple_cpp_lib.o + ``` + +### 3. Use the C++ library in your python worker +You can use the Python library to call native code written in C++. Here is an example that calls native C++ library +from the Python worker. +See [simple_cpp_lib.cpp](src/example/worker/cpp/simple_cpp_lib.cpp) +and [simple_cpp_worker.py](src/example/worker/cpp/simple_cpp_worker.py) for complete working example. + ```python -metrics_settings = MetricsSettings( - directory='/path/to/folder', - file_name='metrics_file_name.extension', - update_interval=0.1, -) +from conductor.client.http.models.task import Task +from conductor.client.http.models.task_result import TaskResult +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.worker.worker_interface import WorkerInterface +from ctypes import cdll + +class CppWrapper: + def __init__(self, file_path='./lib.so'): + self.cpp_lib = cdll.LoadLibrary(file_path) + + def get_sum(self, X: int, Y: int) -> int: + return self.cpp_lib.get_sum(X, Y) + + +class SimpleCppWorker(WorkerInterface): + cpp_wrapper = CppWrapper() + + def execute(self, task: Task) -> TaskResult: + execution_result = self.cpp_wrapper.get_sum(1, 2) + task_result = self.get_task_result_from_task(task) + task_result.add_output_data( + 'sum', execution_result + ) + task_result.status = TaskResultStatus.COMPLETED + return task_result ``` - -* `directory`: Directory where to store the metrics - * make sure that you have created this folder before, or the program have permission to create it for you -* `file_name`: File where the metrics are stored - * example: `metrics.log` -* `update_interval`: Time interval in seconds to refresh metrics into the file - * example: `0.1` means metrics are updated every 0.1s, or 100ms - -### Next: [Create and run Task Workers](docs/worker/README.md)