forked from dancaron/Django-ORM
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathjob.py
121 lines (100 loc) · 3.45 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*- coding: utf-8 -*-
"""
A Job is MCP Client's representation of a unit of work to be
performed--corresponding to a Task on the MCP Server side. Jobs are run in
batches by clientScript modules and populated with an exit code, standard out
and standard error information.
"""
from __future__ import absolute_import
import traceback
import sys
import logging
from contextlib import contextmanager
from custom_handlers import CallbackHandler
LOGGER = logging.getLogger("archivematica.mcp.client.job")
class Job:
def __init__(self, name, uuid, args, caller_wants_output=False):
self.name = name
self.UUID = uuid
self.args = [name] + args
self.caller_wants_output = caller_wants_output
self.int_code = 0
self.status_code = "success"
self.output = ""
self.error = ""
def dump(self):
return (
u"#<%s; exit=%s; code=%s uuid=%s\n"
u"=============== STDOUT ===============\n"
u"%s"
u"\n=============== END STDOUT ===============\n"
u"=============== STDERR ===============\n"
u"%s"
u"\n=============== END STDERR ===============\n"
u"\n>"
) % (
self.name,
self.int_code,
self.status_code,
self.UUID,
self.get_stdout(),
self.get_stderr(),
)
def load_from(self, other_job):
self.name = other_job.name
self.UUID = other_job.UUID
self.args = other_job.args
self.caller_wants_output = other_job.caller_wants_output
self.int_code = other_job.int_code
self.status_code = other_job.status_code
self.output = other_job.output
self.error = other_job.error
def set_status(self, int_code, status_code="success"):
if int_code:
self.int_code = int(int_code)
self.status_code = status_code
def write_output(self, s):
self.output += s
def write_error(self, s):
self.error += s
def print_output(self, *args):
self.write_output(" ".join([self._to_str(x) for x in args]) + "\n")
def print_error(self, *args):
self.write_error(" ".join([self._to_str(x) for x in args]) + "\n")
@staticmethod
def _to_str(thing):
try:
return str(thing)
except UnicodeEncodeError:
return thing.encode("utf8")
def pyprint(self, *objects, **kwargs):
file = kwargs.get("file", sys.stdout)
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "\n")
msg = sep.join([self._to_str(x) for x in objects]) + end
if file == sys.stdout:
self.write_output(msg)
elif file == sys.stderr:
self.write_error(msg)
else:
raise Exception("Unrecognised print file: " + str(file))
def get_exit_code(self):
return self.int_code
def get_stdout(self):
return self.output.decode("utf-8")
def get_stderr(self):
return self.error.decode("utf-8")
@contextmanager
def JobContext(self, logger=None):
handler = CallbackHandler(self.print_error, self.name)
if logger:
logger.addHandler(handler)
try:
yield
except Exception as e:
self.write_error(str(e))
self.write_error(traceback.format_exc())
self.set_status(1)
finally:
if logger:
logger.removeHandler(handler)