forked from certbot/certbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreporter.py
119 lines (95 loc) · 4.06 KB
/
reporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Collects and displays information to the user."""
from __future__ import print_function
import collections
import logging
import os
import sys
import textwrap
from six.moves import queue # pylint: disable=import-error
import zope.interface
from certbot import interfaces
from certbot import util
logger = logging.getLogger(__name__)
# Store the pid of the process that first imported this module so that
# atexit_print_messages side-effects such as error reporting can be limited to
# this process and not any fork()'d children.
INITIAL_PID = os.getpid()
@zope.interface.implementer(interfaces.IReporter)
class Reporter(object):
"""Collects and displays information to the user.
:ivar `queue.PriorityQueue` messages: Messages to be displayed to
the user.
"""
HIGH_PRIORITY = 0
"""High priority constant. See `add_message`."""
MEDIUM_PRIORITY = 1
"""Medium priority constant. See `add_message`."""
LOW_PRIORITY = 2
"""Low priority constant. See `add_message`."""
_msg_type = collections.namedtuple('ReporterMsg', 'priority text on_crash')
def __init__(self, config):
self.messages = queue.PriorityQueue()
self.config = config
def add_message(self, msg, priority, on_crash=True):
"""Adds msg to the list of messages to be printed.
:param str msg: Message to be displayed to the user.
:param int priority: One of `HIGH_PRIORITY`, `MEDIUM_PRIORITY`,
or `LOW_PRIORITY`.
:param bool on_crash: Whether or not the message should be
printed if the program exits abnormally.
"""
assert self.HIGH_PRIORITY <= priority <= self.LOW_PRIORITY
self.messages.put(self._msg_type(priority, msg, on_crash))
logger.debug("Reporting to user: %s", msg)
def atexit_print_messages(self, pid=None):
"""Function to be registered with atexit to print messages.
:param int pid: Process ID
"""
if pid is None:
pid = INITIAL_PID
# This ensures that messages are only printed from the process that
# created the Reporter.
if pid == os.getpid():
self.print_messages()
def print_messages(self):
"""Prints messages to the user and clears the message queue.
If there is an unhandled exception, only messages for which
``on_crash`` is ``True`` are printed.
"""
bold_on = False
if not self.messages.empty():
no_exception = sys.exc_info()[0] is None
bold_on = sys.stdout.isatty()
if not self.config.quiet:
if bold_on:
print(util.ANSI_SGR_BOLD)
print('IMPORTANT NOTES:')
first_wrapper = textwrap.TextWrapper(
initial_indent=' - ',
subsequent_indent=(' ' * 3),
break_long_words=False,
break_on_hyphens=False)
next_wrapper = textwrap.TextWrapper(
initial_indent=first_wrapper.subsequent_indent,
subsequent_indent=first_wrapper.subsequent_indent,
break_long_words=False,
break_on_hyphens=False)
while not self.messages.empty():
msg = self.messages.get()
if self.config.quiet:
# In --quiet mode, we only print high priority messages that
# are flagged for crash cases
if not (msg.priority == self.HIGH_PRIORITY and msg.on_crash):
continue
if no_exception or msg.on_crash:
if bold_on and msg.priority > self.HIGH_PRIORITY:
if not self.config.quiet:
sys.stdout.write(util.ANSI_SGR_RESET)
bold_on = False
lines = msg.text.splitlines()
print(first_wrapper.fill(lines[0]))
if len(lines) > 1:
print("\n".join(
next_wrapper.fill(line) for line in lines[1:]))
if bold_on and not self.config.quiet:
sys.stdout.write(util.ANSI_SGR_RESET)