Skip to content

Commit

Permalink
test_stream: use specialized assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jan 17, 2025
1 parent ead82c9 commit 38801df
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def range_raising_at_exhaustion(
class TestStream(unittest.TestCase):
def test_init(self) -> None:
stream = Stream(src)
self.assertEqual(
self.assertIs(
stream._source,
src,
msg="The stream's `source` must be the source argument.",
Expand Down Expand Up @@ -644,7 +644,7 @@ def test_flatten_concurrency(self) -> None:
).flatten(concurrency=2),
times=3,
)
self.assertEqual(
self.assertListEqual(
res,
["a", "b"] * iterable_size + ["c"] * iterable_size,
msg="`flatten` should process 'a's and 'b's concurrently and then 'c's",
Expand Down Expand Up @@ -828,13 +828,13 @@ def test_skip(self) -> None:
Stream(src).skip()

for count in [0, 1, 3]:
self.assertEqual(
self.assertListEqual(
list(Stream(src).skip(count)),
list(src)[count:],
msg="`skip` must skip `count` elements",
)

self.assertEqual(
self.assertListEqual(
list(
Stream(map(throw_for_odd_func(TestError), src))
.skip(count)
Expand All @@ -844,13 +844,13 @@ def test_skip(self) -> None:
msg="`skip` must not count exceptions as skipped elements",
)

self.assertEqual(
self.assertListEqual(
list(Stream(src).skip(until=lambda n: n >= count)),
list(src)[count:],
msg="`skip` must yield starting from the first element satisfying `until`",
)

self.assertEqual(
self.assertListEqual(
list(Stream(src).skip(until=lambda n: False)),
[],
msg="`skip` must not yield any element if `until` is never satisfied",
Expand All @@ -863,22 +863,22 @@ def test_truncate(self) -> None:
):
Stream(src).truncate()

self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(N * 2)),
list(src),
msg="`truncate` must be ok with count >= stream length",
)
self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(2)),
[0, 1],
msg="`truncate` must be ok with count >= 1",
)
self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(1)),
[0],
msg="`truncate` must be ok with count == 1",
)
self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(0)),
[],
msg="`truncate` must be ok with count == 0",
Expand Down Expand Up @@ -908,7 +908,7 @@ def test_truncate(self) -> None:
):
next(raising_stream_iterator)

self.assertEqual(list(raising_stream_iterator), list(range(1, count + 1)))
self.assertListEqual(list(raising_stream_iterator), list(range(1, count + 1)))

with self.assertRaises(
StopIteration,
Expand All @@ -917,7 +917,7 @@ def test_truncate(self) -> None:
next(raising_stream_iterator)

iter_truncated_on_predicate = iter(Stream(src).truncate(when=lambda n: n == 5))
self.assertEqual(
self.assertListEqual(
list(iter_truncated_on_predicate),
list(Stream(src).truncate(5)),
msg="`when` n == 5 must be equivalent to `count` = 5",
Expand All @@ -934,13 +934,13 @@ def test_truncate(self) -> None:
):
list(Stream(src).truncate(when=lambda _: 1 / 0))

self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(6, when=lambda n: n == 5)),
list(range(5)),
msg="`when` and `count` argument can be set at the same time, and the truncation should happen as soon as one or the other is satisfied.",
)

self.assertEqual(
self.assertListEqual(
list(Stream(src).truncate(5, when=lambda n: n == 6)),
list(range(5)),
msg="`when` and `count` argument can be set at the same time, and the truncation should happen as soon as one or the other is satisfied.",
Expand Down Expand Up @@ -1108,7 +1108,7 @@ def f(i):
size=3, by=lambda n: throw(StopIteration) if n == 2 else n
)
)
self.assertEqual(
self.assertListEqual(
[next(stream_iter), next(stream_iter)],
[[0], [1]],
msg="`group` should yield incomplete groups when `by` raises",
Expand All @@ -1119,7 +1119,7 @@ def f(i):
msg="`group` should raise and skip `elem` if `by(elem)` raises",
):
next(stream_iter)
self.assertEqual(
self.assertListEqual(
next(stream_iter),
[3],
msg="`group` should continue yielding after `by`'s exception has been raised.",
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def slow_first_elem(elem: int):
with self.subTest(stream=stream):
duration, res = timestream(stream)

self.assertEqual(
self.assertListEqual(
res,
expected_elems,
msg="`throttle` with `interval` must yield upstream elements",
Expand Down Expand Up @@ -1226,7 +1226,7 @@ def slow_first_elem(elem: int):
]:
with self.subTest(N=N, stream=stream):
duration, res = timestream(stream)
self.assertEqual(
self.assertListEqual(
res,
expected_elems,
msg="`throttle` with `per_second` must yield upstream elements",
Expand Down Expand Up @@ -1260,18 +1260,18 @@ def slow_first_elem(elem: int):
)

def test_distinct(self) -> None:
self.assertEqual(
self.assertListEqual(
list(Stream("abbcaabcccddd").distinct()),
list("abcd"),
msg="`distinct` should yield distinct elements",
)
self.assertEqual(
self.assertListEqual(
list(Stream("aabbcccaabbcccc").distinct(consecutive_only=True)),
list("abcabc"),
msg="`distinct` should only remove the duplicates that are consecutive if `consecutive_only=True`",
)
for consecutive_only in [True, False]:
self.assertEqual(
self.assertListEqual(
list(
Stream(["foo", "bar", "a", "b"]).distinct(
len, consecutive_only=consecutive_only
Expand All @@ -1280,7 +1280,7 @@ def test_distinct(self) -> None:
["foo", "a"],
msg="`distinct` should yield the first encountered elem among duplicates",
)
self.assertEqual(
self.assertListEqual(
list(Stream([]).distinct(consecutive_only=consecutive_only)),
[],
msg="`distinct` should yield zero elements on empty stream",
Expand All @@ -1293,7 +1293,7 @@ def test_distinct(self) -> None:
list(Stream([[1]]).distinct())

def test_catch(self) -> None:
self.assertEqual(
self.assertListEqual(
list(Stream(src).catch(finally_raise=True)),
list(src),
msg="`catch` should yield elements in exception-less scenarios",
Expand Down Expand Up @@ -1375,7 +1375,7 @@ def f(i):
only_catched_errors_stream = Stream(
map(lambda _: throw(TestError), range(2000))
).catch(TestError)
self.assertEqual(
self.assertListEqual(
list(only_catched_errors_stream),
[],
msg="When upstream raise exceptions without yielding any element, listing the stream must return empty list, without recursion issue.",
Expand Down Expand Up @@ -1417,7 +1417,7 @@ def f(i):
)
)

self.assertEqual(
self.assertListEqual(
list(
Stream(map(lambda n: 1 / n, [0, 1, 2, 4])).catch(
ZeroDivisionError, replacement=float("inf")
Expand All @@ -1426,7 +1426,7 @@ def f(i):
[float("inf"), 1, 0.5, 0.25],
msg="`catch` should be able to yield a non-None replacement",
)
self.assertEqual(
self.assertListEqual(
list(
Stream(map(lambda n: 1 / n, [0, 1, 2, 4])).catch(
ZeroDivisionError, replacement=cast(float, None)
Expand Down Expand Up @@ -1487,7 +1487,7 @@ def test_call(self) -> None:
stream,
msg="`__call__` should return the stream.",
)
self.assertEqual(
self.assertListEqual(
l,
list(src),
msg="`__call__` should exhaust the stream.",
Expand All @@ -1496,7 +1496,7 @@ def test_call(self) -> None:
def test_multiple_iterations(self) -> None:
stream = Stream(src)
for _ in range(3):
self.assertEqual(
self.assertListEqual(
list(stream),
list(src),
msg="The first iteration over a stream should yield the same elements as any subsequent iteration on the same stream, even if it is based on a `source` returning an iterator that only support 1 iteration.",
Expand Down

0 comments on commit 38801df

Please sign in to comment.