Skip to content

JSON parsing bug #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[flake8]
exclude = .git, __pycache__, build, dist, .eggs, .github, .local, docs/,
samples, .venv*, .env*, .vscode, .idea, investigations, tests/*
max-complexity = 10
docstring-convention = numpy
1 change: 1 addition & 0 deletions azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Base module for the Python Durable functions."""
from pkgutil import extend_path
import typing
__path__: typing.Iterable[str] = extend_path(__path__, __name__)
10 changes: 8 additions & 2 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""Base module for the Python Durable functions.

Exposes the different API components intended for public consumption
"""
from .orchestrator import Orchestrator
from .models.DurableOrchestrationClient import DurableOrchestrationClient
from .models.RetryOptions import RetryOptions

__all__ = [
'Orchestrator',
'DurableOrchestrationClient'
]
'DurableOrchestrationClient',
'RetryOptions'
]
1 change: 1 addition & 0 deletions azure/durable_functions/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Constants used to determine the local running context."""
DEFAULT_LOCAL_HOST: str = "localhost:7071"
DEFAULT_LOCAL_ORIGIN: str = f"http://{DEFAULT_LOCAL_HOST}"
3 changes: 3 additions & 0 deletions azure/durable_functions/interfaces/IAction.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Defines the base interface for Actions that need to be executed."""
from ..models.actions import ActionType


class IAction:
"""Defines the base interface for Actions that need to be executed."""

def __init__(self):
"""Create a new Action object."""
actionType: ActionType
4 changes: 4 additions & 0 deletions azure/durable_functions/interfaces/IFunctionContext.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Interface for the Orchestration object exposed to the generator function."""
from ..models import DurableOrchestrationContext


class IFunctionContext:
"""Orchestration object exposed to the generator function."""

def __init__(self, df=None):
"""Create a new orchestration context."""
self.df: DurableOrchestrationContext = df
8 changes: 0 additions & 8 deletions azure/durable_functions/interfaces/ITaskMethods.py

This file was deleted.

3 changes: 1 addition & 2 deletions azure/durable_functions/interfaces/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""Interfaces for durable functions."""
from .IAction import IAction
from .ITaskMethods import ITaskMethods
from .IFunctionContext import IFunctionContext

__all__ = [
'IAction',
'ITaskMethods',
'IFunctionContext'
]
27 changes: 8 additions & 19 deletions azure/durable_functions/models/DurableOrchestrationBindings.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
"""Binding information for durable functions."""
import json
from typing import Dict


class DurableOrchestrationBindings:
"""Binding information.

Provides information relevant to the creation and management of
durable functions.
"""

def __init__(self, client_data: str):
"""Create a new binding object."""
context = json.loads(client_data)
self.task_hub_name: str = context.get('taskHubName')
self.creation_urls: Dict[str, str] = context.get('creationUrls')
self.management_urls: Dict[str, str] = context.get('managementUrls')


'''
{
"taskHubName":"DurableFunctionsHub",
"creationUrls":{
"createNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"createAndWaitOnNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
},
"managementUrls":{
"id":"INSTANCEID",
"statusQueryGetUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"sendEventPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"terminatePostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"rewindPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"purgeHistoryDeleteUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
}
}
'''
78 changes: 53 additions & 25 deletions azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Durable Orchestration Client class definition."""
import requests
import json
from typing import List
Expand All @@ -6,43 +7,70 @@


class DurableOrchestrationClient:
"""Durable Orchestration Client.

def __init__(self, context: str):
self.taskHubName: str

self.uniqueWebhookOrigins: List[str]

# self._axiosInstance: AxiosInstance = None (http client)
Client for starting, querying, terminating and raising events to
orchestration instances.
"""

self._eventNamePlaceholder: str = "{eventName}"
self._functionNamePlaceholder: str = "{functionName}"
self._instanceIdPlaceholder: str = "[/{instanceId}]"
self._reasonPlaceholder: str = "{text}"
def __init__(self, context: str):
"""Create a new Orchestration Client.

self._createdTimeFromQueryKey: str = "createdTimeFrom"
self._createdTimeToQueryKey: str = "createdTimeTo"
self._runtimeStatusQueryKey: str = "runtimeStatus"
self._showHistoryQueryKey: str = "showHistory"
self._showHistoryOutputQueryKey: str = "showHistoryOutput"
self._showInputQueryKey: str = "showInput"
self._orchestrationBindings: DurableOrchestrationBindings = DurableOrchestrationBindings(context)
:param context: The object representing the orchestrationClient input
binding of the Azure function that will use this client.
"""
self.task_hub_name: str
self._uniqueWebHookOrigins: List[str]
self._event_name_placeholder: str = "{eventName}"
self._function_name_placeholder: str = "{functionName}"
self._instance_id_placeholder: str = "[/{instanceId}]"
self._reason_placeholder: str = "{text}"
self._created_time_from_query_key: str = "createdTimeFrom"
self._created_time_to_query_key: str = "createdTimeTo"
self._runtime_status_query_key: str = "runtimeStatus"
self._show_history_query_key: str = "showHistory"
self._show_history_output_query_key: str = "showHistoryOutput"
self._show_input_query_key: str = "showInput"
self._orchestration_bindings: DurableOrchestrationBindings = \
DurableOrchestrationBindings(context)

def start_new(self,
orchestration_function_name: str,
instance_id: str,
client_input):
request_url = self.get_start_new_url(instance_id, orchestration_function_name)
"""Start a new instance of the specified orchestrator function.

If an orchestration instance with the specified ID already exists, the
existing instance will be silently replaced by this new instance.

:param orchestration_function_name: The name of the orchestrator
function to start.
:param instance_id: The ID to use for the new orchestration instance.
If no instanceId is specified, the Durable Functions extension will
generate a random GUID (recommended).
:param client_input: JSON-serializable input value for the orchestrator
function.
:return: The ID of the new orchestration instance.
"""
request_url = self._get_start_new_url(
instance_id,
orchestration_function_name)

result = requests.post(request_url, json=self.get_json_input(client_input))
result = requests.post(request_url, json=self._get_json_input(
client_input))
return result

@staticmethod
def get_json_input(client_input):
def _get_json_input(client_input):
return json.dumps(client_input) if client_input is not None else None

def get_start_new_url(self, instance_id, orchestration_function_name):
request_url = self._orchestrationBindings.creation_urls['createNewInstancePostUri']
request_url = request_url.replace(self._functionNamePlaceholder, orchestration_function_name)
request_url = request_url.replace(self._instanceIdPlaceholder,
f'/{instance_id}' if instance_id is not None else '')
def _get_start_new_url(self, instance_id, orchestration_function_name):
request_url = self._orchestration_bindings.creation_urls[
'createNewInstancePostUri'
]
request_url = request_url.replace(self._function_name_placeholder,
orchestration_function_name)
request_url = request_url.replace(self._instance_id_placeholder,
f'/{instance_id}'
if instance_id is not None else '')
return request_url
140 changes: 117 additions & 23 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,149 @@
"""Defines the Durable Orchestration Context Class Object."""
import json
import logging
import datetime
from typing import List, Any, Dict

from dateutil.parser import parse as dt_parse

from . import (RetryOptions)
from .history import HistoryEvent, HistoryEventType
from ..interfaces import IAction
from ..interfaces import ITaskMethods
from ..models.Task import Task
from ..tasks import call_activity_task, task_all, call_activity_with_retry_task


class DurableOrchestrationContext:
"""Context of the durable orchestration execution.

Parameter data for orchestration bindings that can be used to schedule
function-based activities.
"""

def __init__(self,
context_string: str):
context: Dict[str, Any] = json.loads(context_string)
logging.warning(f"!!!Calling orchestrator handle {context}")
self.histories: List[HistoryEvent] = context.get("history")
self.instanceId = context.get("instanceId")
self.isReplaying = context.get("isReplaying")
self.parentInstanceId = context.get("parentInstanceId")
self._histories: List[HistoryEvent] = context.get("history")
self._instance_id = context.get("instanceId")
self._is_replaying = context.get("isReplaying")
self._parent_instance_id = context.get("parentInstanceId")
self.call_activity = lambda n, i: call_activity_task(
state=self.histories,
name=n,
input_=i)
self.call_activity_with_retry = lambda n, o, i: call_activity_with_retry_task(
state=self.histories,
retry_options=o,
name=n,
input_=i)
self.call_activity_with_retry = \
lambda n, o, i: call_activity_with_retry_task(
state=self.histories,
retry_options=o,
name=n,
input_=i)
self.task_all = lambda t: task_all(state=self.histories, tasks=t)
self.decision_started_event: HistoryEvent = list(filter(
# HistoryEventType.OrchestratorStarted
lambda e_: e_["EventType"] == HistoryEventType.OrchestratorStarted,
self.histories))[0]
self.currentUtcDateTime = dt_parse(self.decision_started_event["Timestamp"])
self.newGuidCounter = 0
self._current_utc_datetime = \
dt_parse(self.decision_started_event["Timestamp"])
self.new_guid_counter = 0
self.actions: List[List[IAction]] = []
self.Task: ITaskMethods

def call_activity(name: str, input_=None) -> Task:
raise NotImplementedError("This is a placeholder.")
def call_activity(self, name: str, input_=None) -> Task:
"""Schedule an activity for execution.

:param name: The name of the activity function to call.
:param input_:The JSON-serializable input to pass to the activity
function.
:return: A Durable Task that completes when the called activity
function completes or fails.
"""
raise NotImplementedError("This is a placeholder.")

def call_activity_with_retry(self,
name: str, retry_options: RetryOptions,
input_=None) -> Task:
"""Schedule an activity for execution with retry options.

:param name: The name of the activity function to call.
:param retry_options: The retry options for the activity function.
:param input_: The JSON-serializable input to pass to the activity
function.
:return: A Durable Task that completes when the called activity
function completes or fails completely.
"""
raise NotImplementedError("This is a placeholder.")

def call_sub_orchestrator(self,
name: str, input_=None,
instance_id: str = None) -> Task:
"""Schedule an orchestration function named `name` for execution.

:param name: The name of the orchestrator function to call.
:param input_: The JSON-serializable input to pass to the orchestrator
function.
:param instance_id: A unique ID to use for the sub-orchestration
instance. If `instanceId` is not specified, the extension will generate
an id in the format `<calling orchestrator instance ID>:<#>`
"""
raise NotImplementedError("This is a placeholder.")

@property
def histories(self):
"""Get running history of tasks that have been scheduled."""
return self._histories

@property
def instance_id(self):
"""Get the ID of the current orchestration instance.

The instance ID is generated and fixed when the orchestrator function
is scheduled. It can be either auto-generated, in which case it is
formatted as a GUID, or it can be user-specified with any format.

:return: The ID of the current orchestration instance.
"""
return self._instance_id

@property
def is_replaying(self):
"""Get the value indicating orchestration replaying itself.

This property is useful when there is logic that needs to run only when
the orchestrator function is _not_ replaying. For example, certain
types of application logging may become too noisy when duplicated as
part of orchestrator function replay. The orchestrator code could check
to see whether the function is being replayed and then issue the log
statements when this value is `false`.

:return: value indicating whether the orchestrator function is
currently replaying
"""
return self._is_replaying

@property
def parent_instance_id(self):
"""Get the ID of the parent orchestration.

The parent instance ID is generated and fixed when the parent
orchestrator function is scheduled. It can be either auto-generated, in
which case it is formatted as a GUID, or it can be user-specified with
any format.
:return: ID of the parent orchestration of the current
sub-orchestration instance
"""
return self._parent_instance_id

def call_activity_with_retry(
name: str, retry_options: RetryOptions, input_=None) -> Task:
raise NotImplementedError("This is a placeholder.")
@property
def current_utc_datetime(self) -> datetime:
"""Get the current date/time.

def callSubOrchestrator(
name: str, input=None, instanceId: str = None) -> Task:
raise NotImplementedError("This is a placeholder.")
This date/time value is derived from the orchestration history. It
always returns the same value at specific points in the orchestrator
function code, making it deterministic and safe for replay.
:return: The current date/time in a way that is safe for use by
orchestrator functions
"""
return self._current_utc_datetime

# TODO: more to port over
@current_utc_datetime.setter
def current_utc_datetime(self, value: datetime):
self._current_utc_datetime = value
Loading