Skip to content

Set Custom Status #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self,
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
self._parent_instance_id: str = parentInstanceId
self._custom_status: Any = None
self._new_uuid_counter: int = 0
self.call_activity = lambda n, i=None: call_activity_task(
state=self.histories,
Expand Down Expand Up @@ -221,6 +222,23 @@ def task_any(self, activities: List[Task]) -> TaskSet:
"""
raise NotImplementedError("This is a placeholder.")

def set_custom_status(self, status: Any):
"""Set the customized orchestration status for your orchestrator function.

This status is also returned by the orchestration client through the get_status API

Parameters
----------
status : str
Customized status provided by the orchestrator
"""
self._custom_status = status

@property
def custom_status(self):
"""Get customized status of current orchestration."""
return self._custom_status

@property
def histories(self):
"""Get running history of tasks that have been scheduled."""
Expand Down
7 changes: 3 additions & 4 deletions azure/durable_functions/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __init__(self,
:param activity_func: Generator function to orchestrate.
"""
self.fn: Callable[[DurableOrchestrationContext], Iterator[Any]] = activity_func
self.customStatus: Any = None

def handle(self, context: DurableOrchestrationContext):
"""Handle the orchestration of the user defined generator function.
Expand Down Expand Up @@ -60,7 +59,7 @@ def handle(self, context: DurableOrchestrationContext):
is_done=False,
output=None,
actions=self.durable_context.actions,
custom_status=self.customStatus)
custom_status=self.durable_context.custom_status)
suspended = True
continue

Expand All @@ -79,14 +78,14 @@ def handle(self, context: DurableOrchestrationContext):
is_done=True,
output=sie.value,
actions=self.durable_context.actions,
custom_status=self.customStatus)
custom_status=self.durable_context.custom_status)
except Exception as e:
orchestration_state = OrchestratorState(
is_done=False,
output=None, # Should have no output, after generation range
actions=self.durable_context.actions,
error=str(e),
custom_status=self.customStatus)
custom_status=self.durable_context.custom_status)

return orchestration_state.to_json_string()

Expand Down
4 changes: 4 additions & 0 deletions samples/function_chaining_custom_status/.funcignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.git*
.vscode
local.settings.json
test
130 changes: 130 additions & 0 deletions samples/function_chaining_custom_status/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don’t work, or not
# install all needed dependencies.
#Pipfile.lock

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Azure Functions artifacts
bin
obj
appsettings.json
local.settings.json
.python_packages
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging

def main(key: str) -> str:
"""Activity function performing a specific step in the chain

Parameters
----------
key : str
key to a dictionary that advances the chain

Returns
-------
[int]
value in the dictionary as a result
"""

logging.warning(f"Activity Triggered: {key}")
switch_statement = {}
switch_statement["One"] = "Two"
switch_statement["Two"] = "Three"
try:
if switch_statement[key]:
return f'{switch_statement[key]}'
except KeyError as e:
return f'One'


Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "key",
"type": "activityTrigger",
"direction": "in",
"dataType": "string"
}
],
"disabled": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
"""This function provides the core function chaining orchestration logic
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments here refer to function chaining which we're doing, but I would want them to also highlight that they are setting custom statuses, which I believe the user can see if they click on the returned 'status URL'....just to differentiate this sample's comments from function_chaining

and also sets custom status using which a user can see intermittent status of
the orchestration through get_status client URL.

Parameters
----------
context: DurableOrchestrationContext
This context has the past history
and the durable orchestration API's to chain a set of functions

Returns
-------
final_result: str
Returns the final result after the chain completes

Yields
-------
call_activity: str
Yields at every step of the function chain orchestration logic
"""

# Chained functions - output of a function is passed as
# input to the next function in the chain
r1 = yield context.call_activity("DurableActivity", "Zero")
context.set_custom_status(f'{r1} ->')
r2 = yield context.call_activity("DurableActivity", r1)
context.set_custom_status(f'{r1} -> {r2} ->')
r3 = yield context.call_activity("DurableActivity", r2)
context.set_custom_status(f'{r1} -> {r2} -> {r3}')
return r3

main = df.Orchestrator.create(orchestrator_function)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
import time
from azure.durable_functions import DurableOrchestrationClient
import azure.functions as func


async def main(req: func.HttpRequest, starter: str, message):
"""This function starts up the orchestrator from an HTTP endpoint

starter: str
A JSON-formatted string describing the orchestration context

message:
An azure functions http output binding, it enables us to establish
an http response.

Parameters
----------
req: func.HttpRequest
An HTTP Request object, it can be used to parse URL
parameters.
"""


function_name = req.route_params.get('functionName')
logging.info(starter)
client = DurableOrchestrationClient(starter)
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
message.set(response)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}",
"methods": [
"post",
"get"
]
},
{
"direction": "out",
"name": "message",
"type": "http"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in",
"datatype": "string"
}
]
}
37 changes: 37 additions & 0 deletions samples/function_chaining_custom_status/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Function Chaining with Custom Status - Sample

This sample demonstrates how to go about implementing the [Function Chaining](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp#chaining) pattern in Python Durable Functions.

It additionally demonstrates how to go about setting intermittent status while an orchestation is executing. This enables a user to monitor the status of the orchestration through a custom message set by the user.

## Usage Instructions

### Create a `local.settings.json` file in this directory
This file stores app settings, connection strings, and other settings used by local development tools. Learn more about it [here](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash#local-settings-file).
For this sample, you will only need an `AzureWebJobsStorage` connection string, which you can obtain from the Azure portal.

With you connection string, your `local.settings.json` file should look as follows, with `<your connection string>` replaced with the connection string you obtained from the Azure portal:

```json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "<your connection string>",
"FUNCTIONS_WORKER_RUNTIME": "python"
}
}
```

### Run the Sample
To try this sample, run `func host start` in this directory. If all the system requirements have been met, and
after some initialization logs, you should see something like the following:

```bash
Http Functions:

DurableTrigger: [POST,GET] http://localhost:7071/api/orchestrators/{functionName}
```

This indicates that your `DurableTrigger` function can be reached via a `GET` or `POST` request to that URL. `DurableTrigger` starts the function-chaning orchestrator whose name is passed as a parameter to the URL. So, to start the orchestrator, which is named `DurableOrchestration`, make a GET request to `http://127.0.0.1:7071/api/orchestrators/DurableOrchestration`.

And that's it! You should see a JSON response with five URLs to monitor the status of the orchestration. To learn more about this, please read [here](TODO)!
3 changes: 3 additions & 0 deletions samples/function_chaining_custom_status/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"version": "2.0"
}
Loading