From c1b847a1fc63e9069bacccb06cbf29aba268dd29 Mon Sep 17 00:00:00 2001 From: ssimic1144 Date: Wed, 15 Sep 2021 10:05:41 +0200 Subject: [PATCH] Basic DMN parser & execution --- .gitignore | 2 ++ bpmn_model.py | 4 +-- dmn_model.py | 74 +++++++++++++++++++++++++++++++++++++++++ dmn_types.py | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 dmn_model.py create mode 100644 dmn_types.py diff --git a/.gitignore b/.gitignore index d8f548b..d3012c7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,7 @@ models/chatbot_model.bpmn models/test_podproces.bpmn models/test_call_activity.bpmn models/test_business_rule.bpmn +models/test_dmn.dmn +models/test_exe_dmn.dmn py-bpmn-env database/ \ No newline at end of file diff --git a/bpmn_model.py b/bpmn_model.py index 4375564..9c3aacc 100644 --- a/bpmn_model.py +++ b/bpmn_model.py @@ -52,11 +52,11 @@ def __init__(self, model_path): if isinstance(t, ExclusiveGateway): if t.default: self.elements[t.default].default = True - self.elements[t._id] = t - self.process_elements[p._id][t._id] = t if isinstance(t, StartEvent): self.pending.append(t) self.process_pending[p._id].append(t) + self.elements[t._id] = t + self.process_elements[p._id][t._id] = t #Check if there is single deployement subprocess for k,v in self.subprocesses.items(): if v: diff --git a/dmn_model.py b/dmn_model.py new file mode 100644 index 0000000..55faf44 --- /dev/null +++ b/dmn_model.py @@ -0,0 +1,74 @@ +import xml.etree.ElementTree as ET +from dmn_types import * +from collections import deque +from copy import deepcopy + +class DmnModel(): + def __init__(self, model_path): + self.model_path = model_path + self.decisions = {} + + model_tree = ET.parse(self.model_path) + model_root = model_tree.getroot() + decisions = model_root.findall("dmn:decision", NS) + for decision in decisions: + d = DMN_MAPPINGS["dmn:decision"]() + d.parse(decision) + self.decisions[d._id] = d + + async def create_instance(self, _id, bpmn_input_variables): + instance = DmnInstance(_id, bpmn_input_variables, model = self) + return instance +class DmnInstance(): + def __init__(self, _id, bpmn_input_variables, model): + self._id = _id + self.bpmn_input_variables = bpmn_input_variables + self.model = model + self.decisions = model.decisions + self.decisions_queue = deque(self.sort_required_decision_list()) + + print("Final Decision queue : ",self.decisions_queue) + + def sort_required_decision_list(self): + helper_list = [] + for current, _ in self.model.decisions.items(): + helper_list.append(current) + list_copy = deepcopy(helper_list) + if not self.decisions[current].required_decisions: + helper_list.remove(current) + helper_list.insert(0, current) + continue + for pos,dec in enumerate(list_copy): + #Current is required for decisions in helper list + if current in self.decisions[dec].required_decisions: + #Current is already in good position + if helper_list.index(current) < helper_list.index(dec): + continue + #Put current before decision it is required for + else: + helper_list.remove(current) + helper_list.insert(pos,current) + if dec in self.decisions[current].required_decisions: + #Current is before its required decision... + #I don't think this case is possible, but additional testing is needed + if helper_list.index(current) < helper_list.index(dec): + print("Intervention needed") + return helper_list + + async def run(self): + decisions_queue = deepcopy(self.decisions_queue) + input_variables = deepcopy(self.bpmn_input_variables) + while decisions_queue: + current_decision = decisions_queue.popleft() + current_decision = self.decisions[current_decision] + output = current_decision.run(input_variables) + input_variables = {**output, **input_variables} + return output + +if __name__ == "__main__": + d = DmnModel("models/test_dmn.dmn") + i = DmnInstance(123, {"input_2":"test_2"}, d) + #output = i.run() + #print(output) + #for k,v in d.decisions.items(): + # print(v.decision_table.rules) \ No newline at end of file diff --git a/dmn_types.py b/dmn_types.py new file mode 100644 index 0000000..e303b96 --- /dev/null +++ b/dmn_types.py @@ -0,0 +1,92 @@ +NS = { + "dmn": "https://www.omg.org/spec/DMN/20191111/MODEL/" +} + +DMN_MAPPINGS = {} + +def dmn_tag(tag): + def wrap(object): + object.tag = tag + DMN_MAPPINGS[tag] = object + return object + + return wrap + +class DmnObject(object): + def __repr__(self): + return f"{type(self).__name__}({self.name or self._id})" + + def parse(self, element): + self._id = element.attrib["id"] + self.name = element.attrib["name"] if "name" in element.attrib else None + def run(self): + return True + +@dmn_tag("dmn:decision") +class Decision(DmnObject): + def __init__(self): + self.required_decisions = [] + self.decision_table = None + def parse(self, element): + super(Decision,self).parse(element) + for req_decision in element.findall(".//dmn:requiredDecision", NS): + self.required_decisions.append(req_decision.attrib["href"][1:]) + self.decision_table = DecisionTable() + self.decision_table.parse(element.find("dmn:decisionTable",NS)) + def run(self, variables): + return self.decision_table.run(variables) + +class DecisionTable(DmnObject): + def __init__(self): + self.hit_policy = None + self.input_variables = [] + self.output_names = [] + self.rules = [] + def parse(self,element): + super(DecisionTable, self).parse(element) + self.hit_policy = element.attrib["hitPolicy"] if "hitPolicy" in element.attrib else "UNIQUE" + #The input expression determines the input value of a column + for input_expression in element.findall(".//dmn:inputExpression",NS): + self.input_variables.append(input_expression.find("dmn:text",NS).text) + for output in element.findall("dmn:output",NS): + self.output_names.append(output.attrib["name"]) + for rule in element.findall("dmn:rule",NS): + rule_dict = {"input":{}, "output":{}} + for position, input_entry in enumerate(rule.findall("dmn:inputEntry",NS)): + rule_dict["input"][self.input_variables[position]] = input_entry.find("dmn:text",NS).text + for position, output_entry in enumerate(rule.findall("dmn:outputEntry",NS)): + rule_dict["output"][self.output_names[position]] = output_entry.find("dmn:text",NS).text + self.rules.append(rule_dict) + + @staticmethod + def check_rule(rule, variables): + check_list = [] + for column in rule: + if not rule[column]: + check_list.append(True) + continue + try: + variables[column] + except KeyError: + check_list.append(False) + continue + if rule[column] == variables[column]: + check_list.append(True) + else: + check_list.append(False) + return all(check_list) + + def unique_hit_policy_run(self,variables): + pass + + def first_hit_policy_run(self, variables): + for rule in self.rules: + if self.check_rule(rule["input"],variables): + return rule["output"] + + def run(self, variables): + if self.hit_policy == "UNIQUE": + output = self.unique_hit_policy_run(variables) + if self.hit_policy == "FIRST": + output = self.first_hit_policy_run(variables) + return output \ No newline at end of file