-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e6dd00f
commit c1b847a
Showing
4 changed files
with
170 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import xml.etree.ElementTree as ET | ||
from dmn_types import * | ||
from collections import deque | ||
from copy import deepcopy | ||
|
||
class DmnModel(): | ||
def __init__(self, model_path): | ||
self.model_path = model_path | ||
self.decisions = {} | ||
|
||
model_tree = ET.parse(self.model_path) | ||
model_root = model_tree.getroot() | ||
decisions = model_root.findall("dmn:decision", NS) | ||
for decision in decisions: | ||
d = DMN_MAPPINGS["dmn:decision"]() | ||
d.parse(decision) | ||
self.decisions[d._id] = d | ||
|
||
async def create_instance(self, _id, bpmn_input_variables): | ||
instance = DmnInstance(_id, bpmn_input_variables, model = self) | ||
return instance | ||
class DmnInstance(): | ||
def __init__(self, _id, bpmn_input_variables, model): | ||
self._id = _id | ||
self.bpmn_input_variables = bpmn_input_variables | ||
self.model = model | ||
self.decisions = model.decisions | ||
self.decisions_queue = deque(self.sort_required_decision_list()) | ||
|
||
print("Final Decision queue : ",self.decisions_queue) | ||
|
||
def sort_required_decision_list(self): | ||
helper_list = [] | ||
for current, _ in self.model.decisions.items(): | ||
helper_list.append(current) | ||
list_copy = deepcopy(helper_list) | ||
if not self.decisions[current].required_decisions: | ||
helper_list.remove(current) | ||
helper_list.insert(0, current) | ||
continue | ||
for pos,dec in enumerate(list_copy): | ||
#Current is required for decisions in helper list | ||
if current in self.decisions[dec].required_decisions: | ||
#Current is already in good position | ||
if helper_list.index(current) < helper_list.index(dec): | ||
continue | ||
#Put current before decision it is required for | ||
else: | ||
helper_list.remove(current) | ||
helper_list.insert(pos,current) | ||
if dec in self.decisions[current].required_decisions: | ||
#Current is before its required decision... | ||
#I don't think this case is possible, but additional testing is needed | ||
if helper_list.index(current) < helper_list.index(dec): | ||
print("Intervention needed") | ||
return helper_list | ||
|
||
async def run(self): | ||
decisions_queue = deepcopy(self.decisions_queue) | ||
input_variables = deepcopy(self.bpmn_input_variables) | ||
while decisions_queue: | ||
current_decision = decisions_queue.popleft() | ||
current_decision = self.decisions[current_decision] | ||
output = current_decision.run(input_variables) | ||
input_variables = {**output, **input_variables} | ||
return output | ||
|
||
if __name__ == "__main__": | ||
d = DmnModel("models/test_dmn.dmn") | ||
i = DmnInstance(123, {"input_2":"test_2"}, d) | ||
#output = i.run() | ||
#print(output) | ||
#for k,v in d.decisions.items(): | ||
# print(v.decision_table.rules) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
NS = { | ||
"dmn": "https://www.omg.org/spec/DMN/20191111/MODEL/" | ||
} | ||
|
||
DMN_MAPPINGS = {} | ||
|
||
def dmn_tag(tag): | ||
def wrap(object): | ||
object.tag = tag | ||
DMN_MAPPINGS[tag] = object | ||
return object | ||
|
||
return wrap | ||
|
||
class DmnObject(object): | ||
def __repr__(self): | ||
return f"{type(self).__name__}({self.name or self._id})" | ||
|
||
def parse(self, element): | ||
self._id = element.attrib["id"] | ||
self.name = element.attrib["name"] if "name" in element.attrib else None | ||
def run(self): | ||
return True | ||
|
||
@dmn_tag("dmn:decision") | ||
class Decision(DmnObject): | ||
def __init__(self): | ||
self.required_decisions = [] | ||
self.decision_table = None | ||
def parse(self, element): | ||
super(Decision,self).parse(element) | ||
for req_decision in element.findall(".//dmn:requiredDecision", NS): | ||
self.required_decisions.append(req_decision.attrib["href"][1:]) | ||
self.decision_table = DecisionTable() | ||
self.decision_table.parse(element.find("dmn:decisionTable",NS)) | ||
def run(self, variables): | ||
return self.decision_table.run(variables) | ||
|
||
class DecisionTable(DmnObject): | ||
def __init__(self): | ||
self.hit_policy = None | ||
self.input_variables = [] | ||
self.output_names = [] | ||
self.rules = [] | ||
def parse(self,element): | ||
super(DecisionTable, self).parse(element) | ||
self.hit_policy = element.attrib["hitPolicy"] if "hitPolicy" in element.attrib else "UNIQUE" | ||
#The input expression determines the input value of a column | ||
for input_expression in element.findall(".//dmn:inputExpression",NS): | ||
self.input_variables.append(input_expression.find("dmn:text",NS).text) | ||
for output in element.findall("dmn:output",NS): | ||
self.output_names.append(output.attrib["name"]) | ||
for rule in element.findall("dmn:rule",NS): | ||
rule_dict = {"input":{}, "output":{}} | ||
for position, input_entry in enumerate(rule.findall("dmn:inputEntry",NS)): | ||
rule_dict["input"][self.input_variables[position]] = input_entry.find("dmn:text",NS).text | ||
for position, output_entry in enumerate(rule.findall("dmn:outputEntry",NS)): | ||
rule_dict["output"][self.output_names[position]] = output_entry.find("dmn:text",NS).text | ||
self.rules.append(rule_dict) | ||
|
||
@staticmethod | ||
def check_rule(rule, variables): | ||
check_list = [] | ||
for column in rule: | ||
if not rule[column]: | ||
check_list.append(True) | ||
continue | ||
try: | ||
variables[column] | ||
except KeyError: | ||
check_list.append(False) | ||
continue | ||
if rule[column] == variables[column]: | ||
check_list.append(True) | ||
else: | ||
check_list.append(False) | ||
return all(check_list) | ||
|
||
def unique_hit_policy_run(self,variables): | ||
pass | ||
|
||
def first_hit_policy_run(self, variables): | ||
for rule in self.rules: | ||
if self.check_rule(rule["input"],variables): | ||
return rule["output"] | ||
|
||
def run(self, variables): | ||
if self.hit_policy == "UNIQUE": | ||
output = self.unique_hit_policy_run(variables) | ||
if self.hit_policy == "FIRST": | ||
output = self.first_hit_policy_run(variables) | ||
return output |