|
| 1 | +import itertools |
| 2 | +from multiprocessing import Manager |
1 | 3 | import threading
|
2 | 4 | import time
|
3 | 5 | import weakref
|
@@ -69,6 +71,44 @@ def test_map_timeout(self):
|
69 | 71 |
|
70 | 72 | self.assertEqual([None, None], results)
|
71 | 73 |
|
| 74 | + def test_map_args(self): |
| 75 | + with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): |
| 76 | + self.executor.map(bool, [], buffersize=0) |
| 77 | + with self.assertRaisesRegex( |
| 78 | + ValueError, "cannot specify both buffersize and timeout." |
| 79 | + ): |
| 80 | + self.executor.map(bool, [], timeout=1, buffersize=1) |
| 81 | + |
| 82 | + def test_map_infinite_iterable(self): |
| 83 | + results = self.executor.map(str, itertools.count(1), buffersize=1) |
| 84 | + self.assertEqual(next(iter(results)), "1") |
| 85 | + |
| 86 | + def test_map_buffersize(self): |
| 87 | + manager = Manager() |
| 88 | + |
| 89 | + for buffersize, iterable_size in [ |
| 90 | + (1, 5), |
| 91 | + (5, 5), |
| 92 | + (10, 5), |
| 93 | + ]: |
| 94 | + iterable = range(iterable_size) |
| 95 | + processed_elements = manager.list() |
| 96 | + |
| 97 | + iterator = self.executor.map( |
| 98 | + processed_elements.append, iterable, buffersize=buffersize |
| 99 | + ) |
| 100 | + time.sleep(1) # wait for buffered futures to finish |
| 101 | + self.assertSetEqual( |
| 102 | + set(processed_elements), |
| 103 | + set(range(min(buffersize, iterable_size))), |
| 104 | + ) |
| 105 | + next(iterator) |
| 106 | + time.sleep(1) # wait for the created future to finish |
| 107 | + self.assertSetEqual( |
| 108 | + set(processed_elements), |
| 109 | + set(range(min(buffersize + 1, iterable_size))), |
| 110 | + ) |
| 111 | + |
72 | 112 | def test_shutdown_race_issue12456(self):
|
73 | 113 | # Issue #12456: race condition at shutdown where trying to post a
|
74 | 114 | # sentinel in the call queue blocks (the queue is full while processes
|
|
0 commit comments