Skip to content

Refactor/Baseline Unit Tests #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,14 @@ appsettings.*.json
*.cer
*.pfx



# pycharm
.idea
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove pycharm based files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove them from the .gitignore? That would add the files to the github repo. We have the .vscode files there, but I usually try to avoid putting IDE specific files especially when there are dev machine specific configurations in them. I can see the value in adding the .vscode files here as they have none dev machine specific configuration in there. Not the case for the pycharm files.

.root-refactor
.sample-refactor
.abstract-yield
.bin

/samples/python_abstract_yield/bin/
/samples/python_durable_bindings/bin/
/samples/python_durable_bindings/.vs/
/samples/python_durable_bindings/obj/
4 changes: 3 additions & 1 deletion azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .orchestrator import Orchestrator
from .models.DurableOrchestrationClient import DurableOrchestrationClient

__all__ = [
'Orchestrator'
'Orchestrator',
'DurableOrchestrationClient'
]
7 changes: 0 additions & 7 deletions azure/durable_functions/durable_orchestration_client.py

This file was deleted.

4 changes: 2 additions & 2 deletions azure/durable_functions/interfaces/ITaskMethods.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

class ITaskMethods:
def __init__(self):
self.all: Callable[List[Task], TaskSet]
self.any: Callable[List[Task], TaskSet]
self.all: Callable[[List[Task]], TaskSet]
self.any: Callable[[List[Task]], TaskSet]
39 changes: 22 additions & 17 deletions azure/durable_functions/models/DurableOrchestrationBindings.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import json
from typing import Dict


class DurableOrchestrationBindings:
def __init__(self):
self.taskHubName: str
self.creationUrls: Dict[str, str]
self.managementUrls: Dict[str, str]
def __init__(self, client_data: str):
context = json.loads(client_data)
self.task_hub_name: str = context.get('taskHubName')
self.creation_urls: Dict[str, str] = context.get('creationUrls')
self.management_urls: Dict[str, str] = context.get('managementUrls')


'''
"taskHubName":"DurableFunctionsHub",
"creationUrls":{
"createNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"createAndWaitOnNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
},
"managementUrls":{
"id":"INSTANCEID",
"statusQueryGetUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"sendEventPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"terminatePostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"rewindPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"purgeHistoryDeleteUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
{
"taskHubName":"DurableFunctionsHub",
"creationUrls":{
"createNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"createAndWaitOnNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
},
"managementUrls":{
"id":"INSTANCEID",
"statusQueryGetUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"sendEventPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"terminatePostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"rewindPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"purgeHistoryDeleteUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
}
}
'''
'''
27 changes: 26 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import requests
import json
from typing import List

from azure.durable_functions.models import DurableOrchestrationBindings


class DurableOrchestrationClient:

def __init__(self):
def __init__(self, context: str):
self.taskHubName: str

self.uniqueWebhookOrigins: List[str]
Expand All @@ -21,3 +25,24 @@ def __init__(self):
self._showHistoryQueryKey: str = "showHistory"
self._showHistoryOutputQueryKey: str = "showHistoryOutput"
self._showInputQueryKey: str = "showInput"
self._orchestrationBindings: DurableOrchestrationBindings = DurableOrchestrationBindings(context)

def start_new(self,
orchestration_function_name: str,
instance_id: str,
client_input):
request_url = self.get_start_new_url(instance_id, orchestration_function_name)

result = requests.post(request_url, json=self.get_json_input(client_input))
return result

@staticmethod
def get_json_input(client_input):
return json.dumps(client_input) if client_input is not None else None

def get_start_new_url(self, instance_id, orchestration_function_name):
request_url = self._orchestrationBindings.creation_urls['createNewInstancePostUri']
request_url = request_url.replace(self._functionNamePlaceholder, orchestration_function_name)
request_url = request_url.replace(self._instanceIdPlaceholder,
f'/{instance_id}' if instance_id is not None else '')
return request_url
49 changes: 31 additions & 18 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
from datetime import datetime
import json
import logging
from typing import List, Any, Dict

from dateutil.parser import parse as dt_parse

from . import (RetryOptions)
from .history import HistoryEvent, HistoryEventType
from ..interfaces import IAction
from ..interfaces import ITaskMethods
from . import (Task, RetryOptions)
from ..models.Task import Task
from ..tasks import call_activity, task_all


class DurableOrchestrationContext:

def __init__(self,
instanceId,
isReplaying,
parentInstanceId,
callActivity,
task_all,
currentUtcDateTime):
self.instanceId: str = instanceId
self.isReplaying: bool = isReplaying
self.parentInstanceId: str = parentInstanceId
self.callActivity = callActivity
self.task_all = task_all
self.currentUtcDateTime = currentUtcDateTime

# self.currentUtcDateTime: Date
self.currentUtcDateTime: datetime
context_string: str):
context: Dict[str, Any] = json.loads(context_string)
logging.warning(f"!!!Calling orchestrator handle {context}")
self.histories: List[HistoryEvent] = context.get("history")
self.instanceId = context.get("instanceId")
self.isReplaying = context.get("isReplaying")
self.parentInstanceId = context.get("parentInstanceId")
self.callActivity = lambda n, i: call_activity(
state=self.histories,
name=n,
input_=i)
self.task_all = lambda t: task_all(state=self.histories, tasks=t)
self.decision_started_event: HistoryEvent = list(filter(
# HistoryEventType.OrchestratorStarted
lambda e_: e_["EventType"] == HistoryEventType.OrchestratorStarted,
self.histories))[0]
self.currentUtcDateTime = dt_parse(self.decision_started_event["Timestamp"])
self.newGuidCounter = 0
self.actions: List[List[IAction]] = []
self.Task: ITaskMethods

def callActivity(name: str, input=None) -> Task:
def callActivity(name: str, input_=None) -> Task:
raise NotImplementedError("This is a placeholder.")

def callActivityWithRetry(
Expand Down
10 changes: 6 additions & 4 deletions azure/durable_functions/models/OrchestratorState.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ def to_json(self) -> Dict[str, Any]:
action_dict['functionName'] = action_obj.functionName
if hasattr(action_obj, 'input'):
action_dict['input'] = action_obj.input

action_result_list.append(action_dict)
json_dict['actions'].append(action_result_list)
json_dict['output'] = self.output
json_dict['error'] = self.error
json_dict['customStatus'] = self.customStatus
if self.output:
json_dict['output'] = self.output
if self.error:
json_dict['error'] = self.error
if self.customStatus:
json_dict['customStatus'] = self.customStatus
return json_dict

def to_json_string(self) -> str:
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@


class Task:
action: IAction

def __init__(self, isCompleted, isFaulted, action,
result=None, timestamp=None, id=None, exc=None):
self.isCompleted: bool = isCompleted
Expand Down
2 changes: 1 addition & 1 deletion azure/durable_functions/models/history/HistoryEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ def __init__(self):
self.EventType: HistoryEventType
self.EventId: int
self.IsPlayed: bool
self.Timestamp: datetime
self.Timestamp: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why Timestamp is changed to a str type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was receiving an error. Timestamp as json is a string value, a conversion stepis required to get it into a datetime. The dtparse is required/already being used to convert the string to a date prior to processing.

e.g. self.currentUtcDateTime = dtparse(decisionStartedEvent["Timestamp"])

self.IsProcessed: bool = False
Loading