Skip to content

Commit

Permalink
wip: warnings local_context
Browse files Browse the repository at this point in the history
  • Loading branch information
nascheme committed Dec 28, 2024
1 parent b0c3994 commit 3952bb0
Showing 1 changed file with 184 additions and 17 deletions.
201 changes: 184 additions & 17 deletions Lib/warnings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,112 @@
"""Python part of the warnings subsystem."""

import sys
import itertools as _itertools
import contextvars as _contextvars


__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings", "deprecated"]

class _Context:
def __init__(self, filters):
self._filters = filters
self.log = None # if set to a list, logging is enabled

def copy(self):
context = _Context(self._filters[:])
return context

def _record_warning(self, msg):
self.log.append(msg)

def filterwarnings(
self,
action,
message="",
category=Warning,
module="",
lineno=0,
append=False,
):
filterwarnings(
action,
message=message,
category=category,
module=module,
lineno=lineno,
append=append,
context=self,
)

def simplefilter(self, action, category=Warning, lineno=0, append=False):
simplefilter(
action,
category=category,
lineno=lineno,
append=append,
context=self,
)

def resetwarnings(self):
resetwarnings(context=self)

def catch_warnings(
self,
*,
record=False,
action=None,
category=Warning,
lineno=0,
append=False,
):
# For easier backwards compatibility.
return _CatchManager(
record=record,
action=action,
category=category,
lineno=lineno,
append=append,
)


class _GlobalContext(_Context):
def __init__(self):
self.log = None

@property
def _filters(self):
# Since there is quite a lot of code that assigns to
# warnings.filters, this needs to return the current value of
# the module global.
return filters


_global_context = _GlobalContext()

_warnings_context = _contextvars.ContextVar('warnings_context')

def get_context():
try:
return _warnings_context.get()
except LookupError:
context = _Context([])
_warnings_context.set(context)
return context


def _set_context(context):
_warnings_context.set(context)


def _new_context():
old_context = get_context()
new_context = old_context.copy()
_set_context(new_context)
return old_context, new_context


def showwarning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
msg = WarningMessage(message, category, filename, lineno, file, line)
Expand All @@ -18,6 +118,10 @@ def formatwarning(message, category, filename, lineno, line=None):
return _formatwarnmsg_impl(msg)

def _showwarnmsg_impl(msg):
context = get_context()
if context.log is not None:
context._record_warning(msg)
return
file = msg.file
if file is None:
file = sys.stderr
Expand Down Expand Up @@ -129,7 +233,7 @@ def _formatwarnmsg(msg):
return _formatwarnmsg_impl(msg)

def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
append=False, *, context=_global_context):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "all", "default", "module",
Expand Down Expand Up @@ -165,9 +269,11 @@ def filterwarnings(action, message="", category=Warning, module="", lineno=0,
else:
module = None

_add_filter(action, message, category, module, lineno, append=append)
_add_filter(action, message, category, module, lineno, append=append,
context=context)

def simplefilter(action, category=Warning, lineno=0, append=False):
def simplefilter(action, category=Warning, lineno=0, append=False, *,
context=_global_context):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
Expand All @@ -183,10 +289,12 @@ def simplefilter(action, category=Warning, lineno=0, append=False):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
_add_filter(action, None, category, None, lineno, append=append)
_add_filter(action, None, category, None, lineno, append=append,
context=context)

def _add_filter(*item, append):
def _add_filter(*item, append, context=_global_context):
with _lock:
filters = context._filters
if not append:
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
Expand All @@ -200,10 +308,10 @@ def _add_filter(*item, append):
filters.append(item)
_filters_mutated()

def resetwarnings():
def resetwarnings(*, context=_global_context):
"""Clear the list of warning filters, so that no filters are active."""
with _lock:
filters[:] = []
context._filters[:] = []
_filters_mutated()

class _OptionError(Exception):
Expand Down Expand Up @@ -371,7 +479,7 @@ def warn_explicit(message, category, filename, lineno,
if registry.get(key):
return
# Search the filters
for item in filters:
for item in _itertools.chain(get_context()._filters, filters):
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
Expand Down Expand Up @@ -496,17 +604,17 @@ def __enter__(self):
self._module._filters_mutated()
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
if self._record:
log = []
self._module._showwarnmsg_impl = log.append
# Reset showwarning() to the default implementation to make sure
# that _showwarnmsg() calls _showwarnmsg_impl()
self._module.showwarning = self._module._showwarning_orig
else:
log = None
if self._filter is not None:
simplefilter(*self._filter)
if self._record:
log = []
self._module._showwarnmsg_impl = log.append
# Reset showwarning() to the default implementation to make sure
# that _showwarnmsg() calls _showwarnmsg_impl()
self._module.showwarning = self._module._showwarning_orig
return log
else:
return None
return log

def __exit__(self, *exc_info):
if not self._entered:
Expand All @@ -518,6 +626,64 @@ def __exit__(self, *exc_info):
self._module._showwarnmsg_impl = self._showwarnmsg_impl


class local_context:
"""A context manager that copies and restores the warnings filter upon
exiting the context. This uses a context variable so that the filter
changes are thread local and work as expected with asynchronous task
switching.
The 'record' argument specifies whether warnings should be captured rather
than being emitted by warnings.showwarning(). When capture is enabled, the
list of warnings is available as get_context().log.
"""
def __init__(self, *, record=False):
self._record = record
self._entered = False

def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._saved_context, context = _new_context()
if self._record:
context.log = []
_filters_mutated()
return context

def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
_warnings_context.set(self._saved_context)
_filters_mutated()


class _CatchManager(local_context):
"""Context manager used by get_context().catch_warnings()."""
def __init__(
self,
*,
record=False,
action=None,
category=Warning,
lineno=0,
append=False,
):
super().__init__(record=record)
if action is None:
self._filter = None
else:
self._filter = (action, category, lineno, append)

def __enter__(self):
context = super().__enter__()
if self._filter is not None:
context.simplefilter(*self._filter)
return context.log

def __exit__(self, *exc_info):
context = super().__exit__(*exc_info)


class deprecated:
"""Indicate that a class, function or overload is deprecated.
Expand Down Expand Up @@ -704,6 +870,7 @@ def extract():
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
try:
raise ImportError # FIXME: temporary, until _warnings is updated
from _warnings import (filters, _defaultaction, _onceregistry,
warn, warn_explicit, _filters_mutated,
_acquire_lock, _release_lock,
Expand Down

0 comments on commit 3952bb0

Please sign in to comment.