Skip to content

Commit

Permalink
Refactor task XML creation
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.code.sf.net/p/dispcalgui/code/trunk@6209 6ae35eb0-b8ff-4ca7-919d-0ec1eff389cd
  • Loading branch information
fhoech committed Sep 16, 2019
1 parent 908cf36 commit de45057
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 74 deletions.
163 changes: 89 additions & 74 deletions DisplayCAL/taskscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@

from log import safe_print
from meta import name as appname
from ordereddict import OrderedDict
from safe_print import enc
from util_os import getenvu
from util_str import safe_str, safe_unicode, universal_newlines
from util_str import indent, safe_str, safe_unicode, universal_newlines
from util_win import run_as_admin


Expand All @@ -63,35 +64,69 @@
MULTIPLEINSTANCES_STOPEXISTING = "StopExisting"


class LogonTrigger(object):
class _Dict2XML(OrderedDict):

def __init__(self, enabled=True):
self.enabled = enabled
# Subclass this

def __init__(self, *args, **kwargs):
OrderedDict.__init__(self, *args, **kwargs)
if not "cls_name" in self:
self["cls_name"] = self.__class__.__name__
if not "cls_attr" in self:
self["cls_attr"] = ""

def __unicode__(self):
return """ <LogonTrigger>
<Enabled>%s</Enabled>
</LogonTrigger>""" % str(self.enabled).lower()
items = []
for name, value in self.iteritems():
if isinstance(value, bool):
value = str(value).lower()
elif name in ("cls_name", "cls_attr") or not value:
continue
if isinstance(value, _Dict2XML):
item = unicode(value)
else:
cc = "".join(part[0].upper() + part[1:]
for part in name.split("_"))
if isinstance(value, (list, tuple)):
item = "\n".join([unicode(item) for item in value])
else:
item = "<%(cc)s>%(value)s</%(cc)s>" % {"cc": cc, "value": value}
items.append(indent(item, " "))
return """<%(cls_name)s%(cls_attr)s>
%(items)s
</%(cls_name)s>""" % {"cls_name": self["cls_name"], "cls_attr": self["cls_attr"],
"items": "\n".join(items)}


class ExecAction(object):
class _Trigger(_Dict2XML):

def __init__(self, cmd, args=None):
self.cmd = cmd
self.args = args or []
# Subclass this

def __unicode__(self):
return """ <Exec>
<Command>%s</Command>
<Arguments>%s</Arguments>
</Exec>""" % (self.cmd, sp.list2cmdline(self.args))
def __init__(self, enabled=True):
kwargs = locals()
for key in ("self",):
del kwargs[key]
_Dict2XML.__init__(self, kwargs.iteritems())


class Task(object):
class LogonTrigger(_Trigger):

pass


class ExecAction(_Dict2XML):

def __init__(self, cmd, args=None):
_Dict2XML.__init__(self, command=cmd,
arguments=args and sp.list2cmdline(args) or None,
cls_name="Exec")


class Task(_Dict2XML):

def __init__(self, name="", author="", description="", group_id="S-1-5-4",
runlevel=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances=MULTIPLEINSTANCES_IGNORENEW,
run_level=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances_policy=MULTIPLEINSTANCES_IGNORENEW,
disallow_start_if_on_batteries=False,
stop_if_going_on_batteries=False,
allow_hard_terminate=True,
Expand All @@ -108,67 +143,47 @@ def __init__(self, name="", author="", description="", group_id="S-1-5-4",
priority=5,
triggers=None,
actions=None):
self.kwargs = locals()
self.triggers = triggers or []
self.actions = actions or []
kwargs = locals()
for key in ("self", "name", "author", "description", "group_id",
"run_level", "stop_on_idle_end", "restart_on_idle",
"triggers", "actions"):
del kwargs[key]
settings = _Dict2XML(kwargs.iteritems(), cls_name="Settings")
settings["idle_settings"] = _Dict2XML(stop_on_idle_end=stop_on_idle_end,
restart_on_idle=restart_on_idle,
cls_name="IdleSettings")
kwargs = OrderedDict()
kwargs["registration_info"] = _Dict2XML(author=author,
description=description,
URI="\\" + name,
cls_name="RegistrationInfo")
kwargs["triggers"] = _Dict2XML(items=triggers or [],
cls_name="Triggers")
kwargs["principals"] = _Dict2XML(items=[_Dict2XML(group_id=group_id,
run_level=run_level,
cls_name="Principal",
cls_attr=' id="Author"')],
cls_name="Principals")
kwargs["settings"] = settings
kwargs["actions"] = _Dict2XML(items=actions or [],
cls_name="Actions",
cls_attr=' Context="Author"')
kwargs["cls_attr"] = ' version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task"'
_Dict2XML.__init__(self, kwargs.iteritems())

def add_exec_action(self, cmd, args=None):
self.actions.append(ExecAction(cmd, args))
self["actions"]["items"].append(ExecAction(cmd, args))

def add_logon_trigger(self, enabled=True):
self.triggers.append(LogonTrigger(enabled))
self["triggers"]["items"].append(LogonTrigger(enabled))

def write_xml(self, xmlfilename):
with open(xmlfilename, "wb") as xmlfile:
xmlfile.write(codecs.BOM_UTF16_LE + str(self))

def __str__(self):
kwargs = dict(self.kwargs)
for name, value in kwargs.iteritems():
if isinstance(value, bool):
kwargs[name] = str(value).lower()
triggers = "\n".join(unicode(trigger) for trigger in self.triggers)
actions = "\n".join(unicode(action) for action in self.actions)
kwargs.update({"triggers": triggers, "actions": actions})
return universal_newlines(("""<?xml version="1.0" encoding="UTF-16"?>
<Task version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task">
<RegistrationInfo>
<Author>%(author)s</Author>
<Description>%(description)s</Description>
<URI>\%(name)s</URI>
</RegistrationInfo>
<Triggers>
%(triggers)s
</Triggers>
<Principals>
<Principal id="Author">
<GroupId>%(group_id)s</GroupId>
<RunLevel>%(runlevel)s</RunLevel>
</Principal>
</Principals>
<Settings>
<MultipleInstancesPolicy>%(multiple_instances)s</MultipleInstancesPolicy>
<DisallowStartIfOnBatteries>%(disallow_start_if_on_batteries)s</DisallowStartIfOnBatteries>
<StopIfGoingOnBatteries>%(stop_if_going_on_batteries)s</StopIfGoingOnBatteries>
<AllowHardTerminate>%(allow_hard_terminate)s</AllowHardTerminate>
<StartWhenAvailable>%(start_when_available)s</StartWhenAvailable>
<RunOnlyIfNetworkAvailable>%(run_only_if_network_available)s</RunOnlyIfNetworkAvailable>
<IdleSettings>
<StopOnIdleEnd>%(stop_on_idle_end)s</StopOnIdleEnd>
<RestartOnIdle>%(restart_on_idle)s</RestartOnIdle>
</IdleSettings>
<AllowStartOnDemand>%(allow_start_on_demand)s</AllowStartOnDemand>
<Enabled>%(enabled)s</Enabled>
<Hidden>%(hidden)s</Hidden>
<RunOnlyIfIdle>%(run_only_if_idle)s</RunOnlyIfIdle>
<WakeToRun>%(wake_to_run)s</WakeToRun>
<ExecutionTimeLimit>%(execution_time_limit)s</ExecutionTimeLimit>
<Priority>%(priority)i</Priority>
</Settings>
<Actions Context="Author">
%(actions)s
</Actions>
</Task>""" % kwargs)).replace("\n", "\r\n").encode("UTF-16-LE")
return universal_newlines("""<?xml version="1.0" encoding="UTF-16"?>
%s""" % unicode(self)).replace("\n", "\r\n").encode("UTF-16-LE")


class TaskScheduler(object):
Expand Down Expand Up @@ -200,8 +215,8 @@ def __iter__(self):

def create_task(self, name, author="", description="",
group_id="S-1-5-4",
runlevel=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances=MULTIPLEINSTANCES_IGNORENEW,
run_level=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances_policy=MULTIPLEINSTANCES_IGNORENEW,
disallow_start_if_on_batteries=False,
stop_if_going_on_batteries=False,
allow_hard_terminate=True,
Expand Down Expand Up @@ -252,8 +267,8 @@ def create_task(self, name, author="", description="",
def create_logon_task(self, name, cmd, args=None,
author="", description="",
group_id="S-1-5-4",
runlevel=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances=MULTIPLEINSTANCES_IGNORENEW,
run_level=RUNLEVEL_LEASTPRIVILEGE,
multiple_instances_policy=MULTIPLEINSTANCES_IGNORENEW,
disallow_start_if_on_batteries=False,
stop_if_going_on_batteries=False,
allow_hard_terminate=True,
Expand Down
19 changes: 19 additions & 0 deletions DisplayCAL/util_str.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,25 @@ def hexunescape(match):
return unichr(int(match.group(1), 16))


def indent(text, prefix, predicate=None):
# From Python 3.7 textwrap module
"""Adds 'prefix' to the beginning of selected lines in 'text'.
If 'predicate' is provided, 'prefix' will only be added to the lines
where 'predicate(line)' is True. If 'predicate' is not provided,
it will default to adding 'prefix' to all non-empty lines that do not
consist solely of whitespace characters.
"""
if predicate is None:
def predicate(line):
return line.strip()

def prefixed_lines():
for line in text.splitlines(True):
yield (prefix + line if predicate(line) else line)
return ''.join(prefixed_lines())


def universal_newlines(txt):
"""
Return txt with all new line formats converted to POSIX newlines.
Expand Down

0 comments on commit de45057

Please sign in to comment.