-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathututil.py
214 lines (150 loc) · 4.88 KB
/
ututil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
unittest utility
"""
import inspect
import logging
import os
import socket
import time
import unittest
_glb = {
'unittest_logger': None,
}
debug_to_stdout = os.environ.get('UT_DEBUG') == '1'
# TODO make this configurable
# logging.basicConfig(level='INFO',
# format='[%(asctime)s,%(process)d-%(thread)d,%(filename)s,%(lineno)d,%(levelname)s] %(message)s',
# datefmt='%H:%M:%S'
# )
# logger = logging.getLogger('kazoo')
# logger.setLevel('INFO')
class Timer(object):
def __init__(self):
self.start = None
self.end = None
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, errtype, errval, _traceback):
self.end = time.time()
def spent(self):
return (self.end or time.time()) - self.start
class ContextFilter(logging.Filter):
"""
Add correct func, line number, line info to log record.
To fix the issue that when test case function use dd() instead of
logger.debug(), logging alwasy print context info of dd(), but not the
caller test_xxx.
"""
def filter(self, record):
# skip this function
stack = inspect.stack()[1:]
for i, (frame, path, ln, func, line, xx) in enumerate(stack):
if (frame.f_globals.get('__name__') == 'pykit.ututil'
and func == 'dd'):
# this frame is dd(), find the caller
_, path, ln, func, line, xx = stack[i + 1]
record._fn = os.path.basename(path)
record._ln = ln
record._func = func
return True
record._fn = record.filename
record._ln = record.lineno
record._func = record.funcName
return True
def _init():
if _glb['unittest_logger'] is not None:
return
# test_logutil might require this module and logutil is still under test!
try:
from pykit import logutil
logger = logutil.make_logger(
'/tmp',
log_name='unittest',
level='DEBUG',
fmt=('[%(asctime)s'
' %(_fn)s:%(_ln)d'
' %(levelname)s]'
' %(message)s'
)
)
logger.addFilter(ContextFilter())
_glb['unittest_logger'] = logger
except Exception as e:
print repr(e) + ' while init root logger'
def dd(*msg):
"""
debug level logging in a test case function test_xx.
dd() write log to stdout if unittest verbosity is 2.
And dd always write log to log file in /tmp dir.
"""
s = ' '.join([x.encode('utf-8') if isinstance(x, unicode) else str(x)
for x in msg])
_init()
l = _glb['unittest_logger']
if l:
l.debug(s)
if not debug_to_stdout:
return
print s
def get_ut_verbosity():
"""
Return the verbosity setting of the currently running unittest
program, or 0 if none is running.
"""
frame = _find_frame_by_self(unittest.TestProgram)
if frame is None:
return 0
self = frame.f_locals.get('self')
return self.verbosity
def get_case_logger():
"""
Get a case specific logger.
The logger name is: `<module>.<class>.<function>`,
such as: `pykit.strutil.test.test_strutil.TestStrutil.test_format_line`
It must be called inside a test_* function of unittest.TestCase, or no
correct module/class/function name can be found.
"""
frame = _find_frame_by_self(unittest.TestCase)
self = frame.f_locals.get('self')
module_name = frame.f_globals.get('__name__')
class_name = self.__class__.__name__
func_name = frame.f_code.co_name
nm = module_name + '.' + class_name + '.' + func_name
logger = logging.getLogger(nm)
for f in logger.filters:
if isinstance(f, ContextFilter):
break
else:
logger.addFilter(ContextFilter())
return logger
def _find_frame_by_self(clz):
"""
Find the first frame on stack in which there is local variable 'self' of
type clz.
"""
frame = inspect.currentframe()
while frame:
self = frame.f_locals.get('self')
if isinstance(self, clz):
return frame
frame = frame.f_back
return None
def wait_listening(ip, port, timeout=15, interval=0.5):
# Wait at most `timeout` second for a tcp listening service to serve.
for ii in range(40):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((ip, port))
break
except socket.error:
dd('trying to connect to {0} failed'.format(str((ip, port))))
sock.close()
time.sleep(.4)
else:
raise
def has_env(kv):
# kv: KEY=value
k, v = kv.split('=', 1)
return os.environ.get(k) == v