Skip to content

Commit

Permalink
Merge tag 'v3.14.0' into cpython
Browse files Browse the repository at this point in the history
  • Loading branch information
jaraco committed Feb 18, 2023
2 parents a018d05 + 44bf875 commit 056c919
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 43 deletions.
30 changes: 30 additions & 0 deletions Lib/test/test_zipfile/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import contextlib
import time


class DeadlineExceeded(Exception):
pass


class TimedContext(contextlib.ContextDecorator):
"""
A context that will raise DeadlineExceeded if the
max duration is reached during the execution.
>>> TimedContext(1)(time.sleep)(.1)
>>> TimedContext(0)(time.sleep)(.1)
Traceback (most recent call last):
...
tests._context.DeadlineExceeded: (..., 0)
"""

def __init__(self, max_duration: int):
self.max_duration = max_duration

def __enter__(self):
self.start = time.monotonic()

def __exit__(self, *err):
duration = time.monotonic() - self.start
if duration > self.max_duration:
raise DeadlineExceeded(duration, self.max_duration)
8 changes: 8 additions & 0 deletions Lib/test/test_zipfile/_func_timeout_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
try:
from func_timeout import func_set_timeout as set_timeout
except ImportError: # pragma: no cover
# provide a fallback that doesn't actually time out
from ._context import TimedContext as set_timeout


__all__ = ['set_timeout']
29 changes: 29 additions & 0 deletions Lib/test/test_zipfile/_itertools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
import itertools


# from jaraco.itertools 6.3.0
class Counter:
"""
Wrap an iterable in an object that stores the count of items
that pass through it.
>>> items = Counter(range(20))
>>> items.count
0
>>> values = list(items)
>>> items.count
20
"""

def __init__(self, i):
self.count = 0
self.iter = zip(itertools.count(1), i)

def __iter__(self):
return self

def __next__(self):
self.count, result = next(self.iter)
return result


# from more_itertools v8.13.0
def always_iterable(obj, base_type=(str, bytes)):
if obj is None:
Expand Down
165 changes: 130 additions & 35 deletions Lib/test/test_zipfile/test_path.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,28 @@
import io
import zipfile
import itertools
import contextlib
import pathlib
import unittest
import string
import pickle
import itertools
import string
import sys
import unittest
import zipfile

from ._test_params import parameterize, Invoked
from ._functools import compose
from ._itertools import Counter

from ._test_params import parameterize, Invoked
from ._func_timeout_compat import set_timeout

from test.support.os_helper import temp_dir


# Poor man's technique to consume a (smallish) iterable.
consume = tuple


# from jaraco.itertools 5.0
class jaraco:
class itertools:
class Counter:
def __init__(self, i):
self.count = 0
self._orig_iter = iter(i)
Counter = Counter

def __iter__(self):
return self

def __next__(self):
result = next(self._orig_iter)
self.count += 1
return result
consume = tuple


def add_dirs(zf):
Expand Down Expand Up @@ -145,7 +135,66 @@ def test_open(self, alpharep):
a, b, g = root.iterdir()
with a.open(encoding="utf-8") as strm:
data = strm.read()
assert data == "content of a"
self.assertEqual(data, "content of a")
with a.open('r', "utf-8") as strm: # not a kw, no gh-101144 TypeError
data = strm.read()
self.assertEqual(data, "content of a")

def test_open_encoding_utf16(self):
in_memory_file = io.BytesIO()
zf = zipfile.ZipFile(in_memory_file, "w")
zf.writestr("path/16.txt", "This was utf-16".encode("utf-16"))
zf.filename = "test_open_utf16.zip"
root = zipfile.Path(zf)
(path,) = root.iterdir()
u16 = path.joinpath("16.txt")
with u16.open('r', "utf-16") as strm:
data = strm.read()
assert data == "This was utf-16"
with u16.open(encoding="utf-16") as strm:
data = strm.read()
assert data == "This was utf-16"

def test_open_encoding_errors(self):
in_memory_file = io.BytesIO()
zf = zipfile.ZipFile(in_memory_file, "w")
zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.")
zf.filename = "test_read_text_encoding_errors.zip"
root = zipfile.Path(zf)
(path,) = root.iterdir()
u16 = path.joinpath("bad-utf8.bin")

# encoding= as a positional argument for gh-101144.
data = u16.read_text("utf-8", errors="ignore")
assert data == "invalid utf-8: ."
with u16.open("r", "utf-8", errors="surrogateescape") as f:
assert f.read() == "invalid utf-8: \udcff\udcff."

# encoding= both positional and keyword is an error; gh-101144.
with self.assertRaisesRegex(TypeError, "encoding"):
data = u16.read_text("utf-8", encoding="utf-8")

# both keyword arguments work.
with u16.open("r", encoding="utf-8", errors="strict") as f:
# error during decoding with wrong codec.
with self.assertRaises(UnicodeDecodeError):
f.read()

@unittest.skipIf(
not getattr(sys.flags, 'warn_default_encoding', 0),
"Requires warn_default_encoding",
)
@pass_alpharep
def test_encoding_warnings(self, alpharep):
"""EncodingWarning must blame the read_text and open calls."""
assert sys.flags.warn_default_encoding
root = zipfile.Path(alpharep)
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").read_text()
assert __file__ == wc.filename
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").open("r").close()
assert __file__ == wc.filename

def test_open_write(self):
"""
Expand Down Expand Up @@ -187,6 +236,8 @@ def test_read(self, alpharep):
root = zipfile.Path(alpharep)
a, b, g = root.iterdir()
assert a.read_text(encoding="utf-8") == "content of a"
# Also check positional encoding arg (gh-101144).
assert a.read_text("utf-8") == "content of a"
assert a.read_bytes() == b"content of a"

@pass_alpharep
Expand All @@ -211,19 +262,6 @@ def test_traverse_truediv(self, alpharep):
e = root / "b" / "d" / "e.txt"
assert e.read_text(encoding="utf-8") == "content of e"

@pass_alpharep
def test_traverse_simplediv(self, alpharep):
"""
Disable the __future__.division when testing traversal.
"""
code = compile(
source="zipfile.Path(alpharep) / 'a'",
filename="(test)",
mode="eval",
dont_inherit=True,
)
eval(code)

@pass_alpharep
def test_pathlike_construction(self, alpharep):
"""
Expand Down Expand Up @@ -292,7 +330,7 @@ def test_joinpath_constant_time(self):
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES

# @func_timeout.func_set_timeout(3)
@set_timeout(3)
def test_implied_dirs_performance(self):
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
zipfile.CompleteDirs._implied_dirs(data)
Expand Down Expand Up @@ -408,6 +446,52 @@ def test_root_unnamed(self, alpharep):
assert sub.name == "b"
assert sub.parent

@pass_alpharep
def test_match_and_glob(self, alpharep):
root = zipfile.Path(alpharep)
assert not root.match("*.txt")

assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]

files = root.glob("**/*.txt")
assert all(each.match("*.txt") for each in files)

assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))

def test_glob_empty(self):
root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
with self.assertRaises(ValueError):
root.glob('')

@pass_alpharep
def test_eq_hash(self, alpharep):
root = zipfile.Path(alpharep)
assert root == zipfile.Path(alpharep)

assert root != (root / "a.txt")
assert (root / "a.txt") == (root / "a.txt")

root = zipfile.Path(alpharep)
assert root in {root}

@pass_alpharep
def test_is_symlink(self, alpharep):
"""
See python/cpython#82102 for symlink support beyond this object.
"""

root = zipfile.Path(alpharep)
assert not root.is_symlink()

@pass_alpharep
def test_relative_to(self, alpharep):
root = zipfile.Path(alpharep)
relative = root.joinpath("b", "c.txt").relative_to(root / "b")
assert str(relative) == "c.txt"

relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
assert str(relative) == "d/e.txt"

@pass_alpharep
def test_inheritance(self, alpharep):
cls = type('PathChild', (zipfile.Path,), {})
Expand All @@ -429,3 +513,14 @@ def test_pickle(self, alpharep, path_type, subpath):
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
assert first.read_text().startswith('content of ')

@pass_alpharep
def test_extract_orig_with_implied_dirs(self, alpharep):
"""
A zip file wrapped in a Path should extract even with implied dirs.
"""
source_path = self.zipfile_ondisk(alpharep)
zf = zipfile.ZipFile(source_path)
# wrap the zipfile for its side effect
zipfile.Path(zf)
zf.extractall(source_path.parent)
Loading

0 comments on commit 056c919

Please sign in to comment.