Skip to content

Commit

Permalink
wip: warnings local_context
Browse files Browse the repository at this point in the history
  • Loading branch information
nascheme committed Dec 27, 2024
1 parent 1d7ded0 commit 1f8b441
Showing 1 changed file with 182 additions and 9 deletions.
191 changes: 182 additions & 9 deletions Lib/warnings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,113 @@
"""Python part of the warnings subsystem."""

import sys
import itertools as _itertools
import contextvars as _contextvars


__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings", "deprecated"]

class _Context:
def __init__(self, filters):
self._filters = filters
self.log = None # if set to a list, logging is enabled

def copy(self):
context = _Context(self._filters[:])
return context

def _record_warning(self, msg):
self.log.append(msg)

def filterwarnings(
self,
action,
message="",
category=Warning,
module="",
lineno=0,
append=False,
):
filterwarnings(
action,
message=message,
category=category,
module=module,
lineno=lineno,
append=append,
context=self,
)

def simplefilter(self, action, category=Warning, lineno=0, append=False):
simplefilter(
action,
category=category,
lineno=lineno,
append=append,
context=self,
)

def resetwarnings(self):
resetwarnings(context=self)

def catch_warnings(
self,
*,
record=False,
action=None,
category=Warning,
lineno=0,
append=False,
):
# For easier backwards compatibility.
return _CatchManager(
record=record,
action=action,
category=category,
lineno=lineno,
append=append,
)


class _GlobalContext(_Context):
def __init__(self):
self.log = None

@property
def _filters(self):
# Since there is quite a lot of code that assigns to
# warnings.filters, this needs to return the current value of
# the module global.
return filters


_global_context = _GlobalContext()

_warnings_context = _contextvars.ContextVar('warnings_context')


def get_context():
try:
return _warnings_context.get()
except LookupError:
context = _Context([])
_warnings_context.set(context)
return context


def _set_context(context):
_warnings_context.set(context)


def _new_context():
old_context = get_context()
new_context = old_context.copy()
_set_context(new_context)
return old_context, new_context


def showwarning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
msg = WarningMessage(message, category, filename, lineno, file, line)
Expand All @@ -18,6 +119,10 @@ def formatwarning(message, category, filename, lineno, line=None):
return _formatwarnmsg_impl(msg)

def _showwarnmsg_impl(msg):
context = get_context()
if context.log is not None:
context._record_warning(msg)
return
file = msg.file
if file is None:
file = sys.stderr
Expand Down Expand Up @@ -129,7 +234,7 @@ def _formatwarnmsg(msg):
return _formatwarnmsg_impl(msg)

def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
append=False, *, context=_global_context):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "all", "default", "module",
Expand Down Expand Up @@ -165,9 +270,11 @@ def filterwarnings(action, message="", category=Warning, module="", lineno=0,
else:
module = None

_add_filter(action, message, category, module, lineno, append=append)
_add_filter(action, message, category, module, lineno, append=append,
context=context)

def simplefilter(action, category=Warning, lineno=0, append=False):
def simplefilter(action, category=Warning, lineno=0, append=False, *,
context=_global_context):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
Expand All @@ -183,10 +290,12 @@ def simplefilter(action, category=Warning, lineno=0, append=False):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
_add_filter(action, None, category, None, lineno, append=append)
_add_filter(action, None, category, None, lineno, append=append,
context=context)

def _add_filter(*item, append):
def _add_filter(*item, append, context=_global_context):
with _lock:
filters = context._filters
if not append:
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
Expand All @@ -200,10 +309,10 @@ def _add_filter(*item, append):
filters.append(item)
_filters_mutated()

def resetwarnings():
def resetwarnings(*, context=_global_context):
"""Clear the list of warning filters, so that no filters are active."""
with _lock:
filters[:] = []
context._filters[:] = []
_filters_mutated()

class _OptionError(Exception):
Expand Down Expand Up @@ -347,7 +456,7 @@ def warn(message, category=None, stacklevel=1, source=None,
warn_explicit(message, category, filename, lineno, module, registry,
globals, source)

def warn_explicit(message, category, filename, lineno,
def _warn_explicit_impl(message, category, filename, lineno,
module=None, registry=None, module_globals=None,
source=None):
lineno = int(lineno)
Expand All @@ -371,7 +480,7 @@ def warn_explicit(message, category, filename, lineno,
if registry.get(key):
return
# Search the filters
for item in filters:
for item in _itertools.chain(get_context()._filters, filters):
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
Expand Down Expand Up @@ -418,6 +527,11 @@ def warn_explicit(message, category, filename, lineno,
_showwarnmsg(msg)


def warn_explicit(*args, **kwargs):
with _lock:
return _warn_explicit_impl(*args, **kwargs)


class WarningMessage(object):

_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
Expand Down Expand Up @@ -518,6 +632,64 @@ def __exit__(self, *exc_info):
self._module._showwarnmsg_impl = self._showwarnmsg_impl


class local_context:
"""A context manager that copies and restores the warnings filter upon
exiting the context. This uses a context variable so that the filter
changes are thread local and work as expected with asynchronous task
switching.
The 'record' argument specifies whether warnings should be captured rather
than being emitted by warnings.showwarning(). When capture is enabled, the
list of warnings is available as get_context().log.
"""
def __init__(self, *, record=False):
self._record = record
self._entered = False

def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._saved_context, context = _new_context()
if self._record:
context.log = []
_filters_mutated()
return context

def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
_warnings_context.set(self._saved_context)
_filters_mutated()


class _CatchManager(local_context):
"""Context manager used by get_context().catch_warnings()."""
def __init__(
self,
*,
record=False,
action=None,
category=Warning,
lineno=0,
append=False,
):
super().__init__(record=record)
if action is None:
self._filter = None
else:
self._filter = (action, category, lineno, append)

def __enter__(self):
context = super().__enter__()
if self._filter is not None:
context.simplefilter(*self._filter)
return context.log

def __exit__(self, *exc_info):
context = super().__exit__(*exc_info)


class deprecated:
"""Indicate that a class, function or overload is deprecated.
Expand Down Expand Up @@ -704,6 +876,7 @@ def extract():
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
try:
raise ImportError # FIXME: temporary, until _warnings is updated
from _warnings import (filters, _defaultaction, _onceregistry,
warn, warn_explicit, _filters_mutated,
_acquire_lock, _release_lock,
Expand Down

0 comments on commit 1f8b441

Please sign in to comment.