Skip to content

Commit

Permalink
Multi model execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikola Tankovic committed Oct 2, 2021
1 parent ece15a1 commit 280a126
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 39 deletions.
24 changes: 19 additions & 5 deletions bpmn_model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from types import SimpleNamespace
import xml.etree.ElementTree as ET
from bpmn_types import *
from pprint import pprint
Expand All @@ -9,7 +10,13 @@
from datetime import datetime
import os
from uuid import uuid4
import json
import env

instance_models = {}


def get_model_for_instance(iid):
return instance_models.get(iid, None)


class UserFormMessage:
Expand All @@ -29,7 +36,7 @@ def __init__(self, model_path):
self.main_collaboration_process = None
self.model_path = model_path
self.subprocesses = {}
self.main_process_name = None
self.main_process = SimpleNamespace()

model_tree = ET.parse(os.path.join("models", self.model_path))
model_root = model_tree.getroot()
Expand All @@ -41,9 +48,11 @@ def __init__(self, model_path):
# Check for Collaboration
if len(processes) > 1 and p.is_main_in_collaboration:
self.main_collaboration_process = p._id
self.main_process_name = p.name
self.main_process.name = p.name
self.main_process.id = p._id
else:
self.main_process_name = p.name
self.main_process.name = p.name
self.main_process.id = p._id
# Parse all elements in the process
for tag, _type in BPMN_MAPPINGS.items():
for e in process.findall(f"{tag}", NS):
Expand All @@ -70,7 +79,10 @@ def __init__(self, model_path):
def to_json(self):
return {
"model_path": self.model_path,
"main_process_name": self.main_process_name,
"main_process_name": self.main_process.name,
"tasks": [
x.to_json() for x in self.elements.values() if isinstance(x, UserTask)
],
}

async def create_instance(self, _id, variables, process=None):
Expand Down Expand Up @@ -109,6 +121,7 @@ def handle_deployment_subprocesses(self):

class BpmnInstance:
def __init__(self, _id, model, variables, in_queue, process):
instance_models[_id] = model
self._id = _id
self.model = model
self.variables = deepcopy(variables)
Expand All @@ -124,6 +137,7 @@ def to_json(self):
"state": self.state,
"model": self.model.to_json(),
"pending": [x._id for x in self.pending],
"env": env.SYSTEM_VARS,
}

@classmethod
Expand Down
6 changes: 6 additions & 0 deletions bpmn_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class BpmnObject(object):
def __repr__(self):
return f"{type(self).__name__}({self.name or self._id})"

def to_json(self):
return {
"_id": self._id,
"name": self.name,
}

def parse(self, element):
self._id = element.attrib["id"]
self.name = element.attrib["name"] if "name" in element.attrib else None
Expand Down
78 changes: 44 additions & 34 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from aiohttp import web
from uuid import uuid4
import asyncio
from bpmn_model import BpmnModel, UserFormMessage
from bpmn_model import BpmnModel, UserFormMessage, get_model_for_instance
import aiohttp_cors
import db_connector
from functools import reduce
Expand All @@ -14,23 +14,31 @@

# uuid4 = lambda: 2 # hardcoded for easy testing

m = BpmnModel("strucna_praksa.bpmn") # hardcoded for now
models = {}
for file in os.listdir("models"):
if file.endswith(".bpmn"):
m = BpmnModel(file)
models[file] = m


async def run_with_server(app):
app["bpmn_model"] = m
async def run_as_server(app):
app["bpmn_models"] = models
log = db_connector.get_running_instances_log()
for l in log:
for key in l:
instance = await app["bpmn_model"].create_instance(key, {})
instance = await instance.run_from_log(l[key]["events"])
for key, data in l.items():
instance = await app["bpmn_models"][data["model_path"]].create_instance(
key, {}
)
instance = await instance.run_from_log(data["events"])
asyncio.create_task(instance.run())


@routes.get("/model")
@routes.get("/model/{model_name}")
async def get_model(request):
# model_id = request.match_info.get("model_id")
return web.FileResponse(path=os.path.join("models", app["bpmn_model"].model_path))
model_name = request.match_info.get("model_name")
return web.FileResponse(
path=os.path.join("models", app["bpmn_models"][model_name].model_path)
)


@routes.post("/instance")
Expand All @@ -46,9 +54,8 @@ async def handle_form(request):
post = await request.json()
instance_id = request.match_info.get("instance_id")
task_id = request.match_info.get("task_id")
app["bpmn_model"].instances[instance_id].in_queue.put_nowait(
UserFormMessage(task_id, post)
)
m = get_model_for_instance(instance_id)
m.instances[instance_id].in_queue.put_nowait(UserFormMessage(task_id, post))

return web.json_response({"status": "OK"})

Expand Down Expand Up @@ -76,29 +83,30 @@ async def search_instance(request):
result_ids = []
for (att, value) in queries:
ids = []
for _id, instance in app["bpmn_model"].instances.items():
search_atts = []
if not att:
search_atts = list(instance.variables.keys())
else:
for key in instance.variables.keys():
if not att or att in key.lower():
search_atts.append(key)
search_atts = filter(
lambda x: isinstance(instance.variables[x], str), search_atts
)
for m in models.values():
for _id, instance in m.instances.items():
search_atts = []
if not att:
search_atts = list(instance.variables.keys())
else:
for key in instance.variables.keys():
if not att or att in key.lower():
search_atts.append(key)
search_atts = filter(
lambda x: isinstance(instance.variables[x], str), search_atts
)

for search_att in search_atts:
if search_att and value in instance.variables[search_att].lower():
# data.append(instance.to_json())
ids.append(_id)
for search_att in search_atts:
if search_att and value in instance.variables[search_att].lower():
# data.append(instance.to_json())
ids.append(_id)
result_ids.append(set(ids))

ids = reduce(lambda a, x: a.intersection(x), result_ids[:-1], result_ids[0])

data = []
for _id in ids:
data.append(app["bpmn_model"].instances[_id].to_json())
data.append(get_model_for_instance(_id).instances[_id].to_json())

return web.json_response({"status": "ok", "results": data})

Expand All @@ -107,9 +115,10 @@ async def search_instance(request):
async def handle_task_info(request):
instance_id = request.match_info.get("instance_id")
task_id = request.match_info.get("task_id")
if instance_id not in app["bpmn_model"].instances:
m = get_model_for_instance(instance_id)
if not m:
raise aiohttp.web.HTTPNotFound
instance = app["bpmn_model"].instances[instance_id]
instance = m.instances[instance_id]
task = instance.model.elements[task_id]

return web.json_response(task.get_info())
Expand All @@ -118,9 +127,10 @@ async def handle_task_info(request):
@routes.get("/instance/{instance_id}")
async def handle_instance_info(request):
instance_id = request.match_info.get("instance_id")
if instance_id not in app["bpmn_model"].instances:
m = get_model_for_instance(instance_id)
if not m:
raise aiohttp.web.HTTPNotFound
instance = app["bpmn_model"].instances[instance_id].to_json()
instance = m.instances[instance_id].to_json()

return web.json_response(instance)

Expand All @@ -131,7 +141,7 @@ async def handle_instance_info(request):
def run():
global app
app = web.Application()
app.on_startup.append(run_with_server)
app.on_startup.append(run_as_server)
app.add_routes(routes)

cors = aiohttp_cors.setup(
Expand Down

0 comments on commit 280a126

Please sign in to comment.