Skip to content

Commit efb2dd9

Browse files
committed
Revisit some tests related to gather_dep
1 parent 62effdf commit efb2dd9

File tree

3 files changed

+179
-192
lines changed

3 files changed

+179
-192
lines changed

distributed/diagnostics/tests/test_eventstream.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@ async def test_eventstream(c, s, *workers):
3030
name: collections.deque(maxlen=100)
3131
for name in "start duration key name color worker worker_thread y alpha".split()
3232
}
33-
workers = dict()
33+
workers = {}
3434
for msg in es.buffer:
3535
task_stream_append(lists, msg, workers)
3636

37-
assert len([n for n in lists["name"] if n.startswith("transfer")]) == 2
37+
assert sum(n == "transfer-sum" for n in lists["name"]) == 2
3838
for name, color in zip(lists["name"], lists["color"]):
39-
if name == "transfer":
40-
assert color == "red"
39+
assert (name == "transfer-sum") == (color == "red")
4140

4241
assert any(c == "black" for c in lists["color"])
4342

distributed/tests/test_scheduler.py

Lines changed: 92 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -132,115 +132,104 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
132132

133133

134134
@pytest.mark.parametrize("ndeps", [0, 1, 4])
135-
@pytest.mark.parametrize(
136-
"nthreads",
137-
[
138-
[("127.0.0.1", 1)] * 5,
139-
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
140-
],
135+
@gen_cluster(
136+
client=True,
137+
nthreads=[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
138+
config={"distributed.scheduler.work-stealing": False},
141139
)
142-
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
143-
@gen_cluster(
144-
client=True,
145-
nthreads=nthreads,
146-
config={"distributed.scheduler.work-stealing": False},
147-
)
148-
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
149-
r"""
150-
Ensure that sibling root tasks are scheduled to the same node, reducing future
151-
data transfer.
152-
153-
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
154-
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
155-
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
156-
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
157-
(though the number of tasks and workers is different):
158-
159-
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
160-
161-
q r s t < --- `sum-aggregate-`
162-
/ \ / \ / \ / \
163-
i j k l m n o p < --- `sum-`
164-
| | | | | | | |
165-
a b c d e f g h < --- `random-`
166-
\ \ \ | | / / /
167-
TRIVIAL * 0..5
168-
169-
Neighboring `random-` tasks should be scheduled on the same worker. We test that
170-
generally, only one worker holds each row of the array, that the `random-` tasks
171-
are never transferred, and that there are few transfers overall.
172-
"""
173-
da = pytest.importorskip("dask.array")
174-
np = pytest.importorskip("numpy")
140+
async def test_decide_worker_coschedule_order_neighbors(c, s, *workers, ndeps):
141+
r"""
142+
Ensure that sibling root tasks are scheduled to the same node, reducing future
143+
data transfer.
144+
145+
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
146+
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
147+
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
148+
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
149+
(though the number of tasks and workers is different):
150+
151+
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
152+
153+
q r s t < --- `sum-aggregate-`
154+
/ \ / \ / \ / \
155+
i j k l m n o p < --- `sum-`
156+
| | | | | | | |
157+
a b c d e f g h < --- `random-`
158+
\ \ \ | | / / /
159+
TRIVIAL * 0..5
160+
161+
Neighboring `random-` tasks should be scheduled on the same worker. We test that
162+
generally, only one worker holds each row of the array, that the `random-` tasks
163+
are never transferred, and that there are few transfers overall.
164+
"""
165+
da = pytest.importorskip("dask.array")
166+
np = pytest.importorskip("numpy")
175167

176-
if ndeps == 0:
177-
x = da.random.random((100, 100), chunks=(10, 10))
178-
else:
168+
if ndeps == 0:
169+
x = da.random.random((100, 100), chunks=(10, 10))
170+
else:
179171

180-
def random(**kwargs):
181-
assert len(kwargs) == ndeps
182-
return np.random.random((10, 10))
172+
def random(**kwargs):
173+
assert len(kwargs) == ndeps
174+
return np.random.random((10, 10))
183175

184-
trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)}
176+
trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)}
185177

186-
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
187-
x = da.blockwise(
188-
random,
189-
"yx",
190-
new_axes={"y": (10,) * 10, "x": (10,) * 10},
191-
dtype=float,
192-
**trivial_deps,
193-
)
178+
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
179+
x = da.blockwise(
180+
random,
181+
"yx",
182+
new_axes={"y": (10,) * 10, "x": (10,) * 10},
183+
dtype=float,
184+
**trivial_deps,
185+
)
194186

195-
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
196-
await xsum
197-
198-
# Check that each chunk-row of the array is (mostly) stored on the same worker
199-
primary_worker_key_fractions = []
200-
secondary_worker_key_fractions = []
201-
for i, keys in enumerate(x.__dask_keys__()):
202-
# Iterate along rows of the array.
203-
keys = {stringify(k) for k in keys}
204-
205-
# No more than 2 workers should have any keys
206-
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
207-
208-
# What fraction of the keys for this row does each worker hold?
209-
key_fractions = [
210-
len(set(w.data).intersection(keys)) / len(keys) for w in workers
211-
]
212-
key_fractions.sort()
213-
# Primary worker: holds the highest percentage of keys
214-
# Secondary worker: holds the second highest percentage of keys
215-
primary_worker_key_fractions.append(key_fractions[-1])
216-
secondary_worker_key_fractions.append(key_fractions[-2])
217-
218-
# There may be one or two rows that were poorly split across workers,
219-
# but the vast majority of rows should only be on one worker.
220-
assert np.mean(primary_worker_key_fractions) >= 0.9
221-
assert np.median(primary_worker_key_fractions) == 1.0
222-
assert np.mean(secondary_worker_key_fractions) <= 0.1
223-
assert np.median(secondary_worker_key_fractions) == 0.0
224-
225-
# Check that there were few transfers
226-
unexpected_transfers = []
227-
for worker in workers:
228-
for log in worker.incoming_transfer_log:
229-
keys = log["keys"]
230-
# The root-ish tasks should never be transferred
231-
assert not any(k.startswith("random") for k in keys), keys
232-
# `object-` keys (the trivial deps of the root random tasks) should be
233-
# transferred
234-
if any(not k.startswith("object") for k in keys):
235-
# But not many other things should be
236-
unexpected_transfers.append(list(keys))
237-
238-
# A transfer at the very end to move aggregated results is fine (necessary with
239-
# unbalanced workers in fact), but generally there should be very very few
240-
# transfers.
241-
assert len(unexpected_transfers) <= 3, unexpected_transfers
242-
243-
test_decide_worker_coschedule_order_neighbors_()
187+
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
188+
await xsum
189+
190+
# Check that each chunk-row of the array is (mostly) stored on the same worker
191+
primary_worker_key_fractions = []
192+
secondary_worker_key_fractions = []
193+
for i, keys in enumerate(x.__dask_keys__()):
194+
# Iterate along rows of the array.
195+
keys = {stringify(k) for k in keys}
196+
197+
# No more than 2 workers should have any keys
198+
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
199+
200+
# What fraction of the keys for this row does each worker hold?
201+
key_fractions = [
202+
len(set(w.data).intersection(keys)) / len(keys) for w in workers
203+
]
204+
key_fractions.sort()
205+
# Primary worker: holds the highest percentage of keys
206+
# Secondary worker: holds the second highest percentage of keys
207+
primary_worker_key_fractions.append(key_fractions[-1])
208+
secondary_worker_key_fractions.append(key_fractions[-2])
209+
210+
# There may be one or two rows that were poorly split across workers,
211+
# but the vast majority of rows should only be on one worker.
212+
assert np.mean(primary_worker_key_fractions) >= 0.9
213+
assert np.median(primary_worker_key_fractions) == 1.0
214+
assert np.mean(secondary_worker_key_fractions) <= 0.1
215+
assert np.median(secondary_worker_key_fractions) == 0.0
216+
217+
# Check that there were few transfers
218+
unexpected_transfers = []
219+
for worker in workers:
220+
for log in worker.incoming_transfer_log:
221+
keys = log["keys"]
222+
# The root-ish tasks should never be transferred
223+
assert not any(k.startswith("random") for k in keys), keys
224+
# `object-` keys (the trivial deps of the root random tasks) should be
225+
# transferred
226+
if any(not k.startswith("object") for k in keys):
227+
# But not many other things should be
228+
unexpected_transfers.append(list(keys))
229+
230+
# A transfer at the very end to move aggregated results is fine (necessary with
231+
# unbalanced workers in fact), but generally there should be very few transfers.
232+
assert len(unexpected_transfers) <= 3, unexpected_transfers
244233

245234

246235
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)

0 commit comments

Comments
 (0)