forked from celery/py-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
64 lines (51 loc) · 1.42 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Compatibility utilities."""
import logging
from logging import NullHandler
# enables celery 3.1.23 to start again
from vine import promise # noqa
from vine.utils import wraps
try:
import fcntl
except ImportError: # pragma: no cover
fcntl = None # noqa
def set_cloexec(fd, cloexec):
"""Set flag to close fd after exec."""
if fcntl is None:
return
try:
FD_CLOEXEC = fcntl.FD_CLOEXEC
except AttributeError:
raise NotImplementedError(
'close-on-exec flag not supported on this platform',
)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
if cloexec:
flags |= FD_CLOEXEC
else:
flags &= ~FD_CLOEXEC
return fcntl.fcntl(fd, fcntl.F_SETFD, flags)
def coro(gen):
"""Decorator to mark generator as a co-routine."""
@wraps(gen)
def _boot(*args, **kwargs):
co = gen(*args, **kwargs)
next(co)
return co
return _boot
def str_to_bytes(s):
"""Convert str to bytes."""
if isinstance(s, str):
return s.encode('utf-8', 'surrogatepass')
return s
def bytes_to_str(s):
"""Convert bytes to str."""
if isinstance(s, bytes):
return s.decode('utf-8', 'surrogatepass')
return s
def get_logger(logger):
"""Get logger by name."""
if isinstance(logger, str):
logger = logging.getLogger(logger)
if not logger.handlers:
logger.addHandler(NullHandler())
return logger