Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD,load_diagrams için threads parametresi ve load_diagram metodu ekl… #90

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ADD,Thread parameter and thread method added for load_diagrams
  • Loading branch information
abdullahbulu committed Jul 28, 2017
commit cc07d9efc1489417f8a7e0c2350d7743e0bd42da
72 changes: 49 additions & 23 deletions zengine/management_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import sys
import tempfile
import threading
from distutils.errors import DistutilsError

from pyoko.db.adapter.db_riak import BlockSave, BlockDelete
Expand Down Expand Up @@ -289,7 +290,8 @@ def _init_update_po_files(self, domains):
"""Update or initialize the `.po` translation files"""
for language in settings.TRANSLATIONS:
for domain, options in domains.items():
if language == options['default']: continue # Default language of the domain doesn't need translations
if language == options[
'default']: continue # Default language of the domain doesn't need translations
if os.path.isfile(_po_path(language, domain)):
# If the translation already exists, update it, keeping the parts already translated
self._update_po_file(language, domain, options['pot'])
Expand All @@ -298,13 +300,15 @@ def _init_update_po_files(self, domains):
self._init_po_file(language, domain, options['pot'])

def _update_po_file(self, language, domain, pot_path):
print('Updating po file for {language} in domain {domain}'.format(language=language, domain=domain))
print('Updating po file for {language} in domain {domain}'.format(language=language,
domain=domain))
updater = babel_frontend.update_catalog()
_setup_babel_command(updater, domain, language, pot_path)
_run_babel_command(updater)

def _init_po_file(self, language, domain, pot_path):
print('Creating po file for {language} in domain {domain}'.format(language=language, domain=domain))
print('Creating po file for {language} in domain {domain}'.format(language=language,
domain=domain))
initializer = babel_frontend.init_catalog()
_setup_babel_command(initializer, domain, language, pot_path)
_run_babel_command(initializer)
Expand All @@ -328,7 +332,9 @@ def run(self):
for language in settings.TRANSLATIONS:
for domain, default_lang in settings.TRANSLATION_DOMAINS.items():
if language == default_lang: continue # Default language of the domain doesn't need translations
print('Compiling po file for {language} in domain {domain}'.format(language=language, domain=domain))
print(
'Compiling po file for {language} in domain {domain}'.format(language=language,
domain=domain))
compiler = babel_frontend.compile_catalog()
_setup_babel_command(compiler, domain, language, _po_path(language, domain))
_run_babel_command(compiler)
Expand All @@ -352,7 +358,8 @@ def _setup_babel_command(babel, domain, language, input_file):


def _po_path(language, domain):
return os.path.join(settings.TRANSLATIONS_DIR, language, 'LC_MESSAGES', '{domain}.po'.format(domain=domain))
return os.path.join(settings.TRANSLATIONS_DIR, language, 'LC_MESSAGES',
'{domain}.po'.format(domain=domain))


class PrepareMQ(Command):
Expand Down Expand Up @@ -427,7 +434,18 @@ def list_system_views(self):
print(" |_ %s" % view)


class LoadDiagrams(Command):
class AtomicCounter():
def __init__(self, initial=0):
self.value = initial
self._lock = threading.Lock()

def increment(self, num=1):
with self._lock:
self.value += num
return self.value


class LoadDiagrams(Command, BaseThreadedCommand):
"""
Loads wf diagrams from disk to DB
"""
Expand All @@ -437,6 +455,7 @@ class LoadDiagrams(Command):
PARAMS = [
{'name': 'wf_path', 'default': None,
'help': 'Only update given BPMN diagram'},
{'name': 'threads', 'default': 30, 'help': 'Max number of threads. Defaults to 1'},

{'name': 'clear', 'action': 'store_true',
'help': 'Clear all TaskManager related models'},
Expand All @@ -451,7 +470,6 @@ def run(self):
read workflows, checks if it's updated,
tries to update if there aren't any running instances of that wf
"""
from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, RunningInstancesExist
from zengine.lib.cache import WFSpecNames

if self.manager.args.clear:
Expand All @@ -462,22 +480,30 @@ def run(self):
paths = self.get_wf_from_path(self.manager.args.wf_path)
else:
paths = self.get_workflows()
count = 0
for wf_name, content in paths:
wf, wf_is_new = BPMNWorkflow.objects.get_or_create(name=wf_name)
content = self._tmp_fix_diagram(content)
diagram, diagram_is_updated = DiagramXML.get_or_create_by_content(wf_name, content)
if wf_is_new or diagram_is_updated or self.manager.args.force:
count += 1
print("%s created or updated" % wf_name.upper())
try:
wf.set_xml(diagram, self.manager.args.force)
except RunningInstancesExist as e:
print(e.message)
print("Give \"--force\" parameter to enforce")

self.counter = AtomicCounter()

self.do_with_submit(self.load_diagram, paths, threads=self.manager.args.threads)
WFSpecNames().refresh()

print("%s BPMN file loaded" % count)
print("%s BPMN file loaded" % self.counter.value)

def load_diagram(self, paths):
from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, RunningInstancesExist

wf_name, content = paths
key = 'bpmn_workflow_%s' % wf_name
wf, wf_is_new = BPMNWorkflow.objects.get_or_create(name=wf_name, key=key)
content = self._tmp_fix_diagram(content)
diagram, diagram_is_updated = DiagramXML.get_or_create_by_content(wf_name, content)
if wf_is_new or diagram_is_updated or self.manager.args.force:
self.counter.increment()
print("%s created or updated" % wf_name.upper())
try:
wf.set_xml(diagram, self.manager.args.force)
except RunningInstancesExist as e:
print(e.message)
print("Give \"--force\" parameter to enforce")

def _clear_models(self):
from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, WFInstance, \
Expand Down Expand Up @@ -584,7 +610,7 @@ def check_redis():

try:
cache.ping()
print(CheckList.OKGREEN+"{0}Redis is working{1}"+CheckList.ENDC)
print(CheckList.OKGREEN + "{0}Redis is working{1}" + CheckList.ENDC)
except ConnectionError as e:
print(__(u"{0}Redis is not working{1} ").format(CheckList.FAIL,
CheckList.ENDC), e.message)
Expand Down Expand Up @@ -666,7 +692,7 @@ def run(self):
if prefix_name != "":
if prefix_name != 'all':
for name in prefix_name.split(','):
keys = cache.keys(name+"*")
keys = cache.keys(name + "*")
for key in keys:
cache.delete(key)
print("%d object(s) deleted from cache with PREFIX %s " % (len(keys), name))
Expand Down