Skip to content

Commit 8aab5e8

Browse files
authored
Deprecate WorkerState accessors (#6579)
1 parent 1797863 commit 8aab5e8

24 files changed

+398
-372
lines changed

distributed/dashboard/components/worker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ def update(self):
8888
w = self.worker
8989
d = {
9090
"Stored": [len(w.data)],
91-
"Executing": ["%d / %d" % (w.executing_count, w.nthreads)],
92-
"Ready": [len(w.ready)],
93-
"Waiting": [w.waiting_for_data_count],
94-
"Connections": [len(w.in_flight_workers)],
91+
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
92+
"Ready": [len(w.state.ready)],
93+
"Waiting": [w.state.waiting_for_data_count],
94+
"Connections": [len(w.state.in_flight_workers)],
9595
"Serving": [len(w._comms)],
9696
}
9797
update(self.source, d)
@@ -225,7 +225,7 @@ def __init__(self, worker, **kwargs):
225225
fig = figure(
226226
title="Communication History",
227227
x_axis_type="datetime",
228-
y_range=[-0.1, worker.total_out_connections + 0.5],
228+
y_range=[-0.1, worker.state.total_out_connections + 0.5],
229229
height=150,
230230
tools="",
231231
x_range=x_range,
@@ -247,7 +247,7 @@ def update(self):
247247
{
248248
"x": [time() * 1000],
249249
"out": [len(self.worker._comms)],
250-
"in": [len(self.worker.in_flight_workers)],
250+
"in": [len(self.worker.state.in_flight_workers)],
251251
},
252252
10000,
253253
)
@@ -263,7 +263,7 @@ def __init__(self, worker, **kwargs):
263263
fig = figure(
264264
title="Executing History",
265265
x_axis_type="datetime",
266-
y_range=[-0.1, worker.nthreads + 0.1],
266+
y_range=[-0.1, worker.state.nthreads + 0.1],
267267
height=150,
268268
tools="",
269269
x_range=x_range,
@@ -281,7 +281,7 @@ def __init__(self, worker, **kwargs):
281281
@log_errors
282282
def update(self):
283283
self.source.stream(
284-
{"x": [time() * 1000], "y": [self.worker.executing_count]}, 1000
284+
{"x": [time() * 1000], "y": [self.worker.state.executing_count]}, 1000
285285
)
286286

287287

distributed/dashboard/tests/test_scheduler_bokeh.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ async def test_stealing_events(c, s, a, b):
121121
await wait(futures)
122122
se.update()
123123
assert len(first(se.source.data.values()))
124-
assert b.tasks
125-
assert sum(se.source.data["count"]) >= len(b.tasks)
124+
assert b.state.tasks
125+
assert sum(se.source.data["count"]) >= len(b.state.tasks)
126126

127127

128128
@gen_cluster(client=True)
@@ -133,7 +133,7 @@ async def test_events(c, s, a, b):
133133
slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True
134134
)
135135

136-
while not b.tasks:
136+
while not b.state.tasks:
137137
await asyncio.sleep(0.01)
138138

139139
e.update()

distributed/deploy/tests/test_local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def test_procs():
9090
assert len(c.workers) == 2
9191
assert all(isinstance(w, Worker) for w in c.workers.values())
9292
with Client(c.scheduler.address) as e:
93-
assert all(w.nthreads == 3 for w in c.workers.values())
93+
assert all(w.state.nthreads == 3 for w in c.workers.values())
9494
assert all(isinstance(w, Worker) for w in c.workers.values())
9595
repr(c)
9696

@@ -324,7 +324,7 @@ async def test_defaults_2():
324324
dashboard_address=":0",
325325
asynchronous=True,
326326
) as c:
327-
assert sum(w.nthreads for w in c.workers.values()) == CPU_COUNT
327+
assert sum(w.state.nthreads for w in c.workers.values()) == CPU_COUNT
328328
assert all(isinstance(w, Worker) for w in c.workers.values())
329329
assert len(c.workers) == 1
330330

distributed/deploy/tests/test_spec_cluster.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ async def test_specification():
4242
assert isinstance(cluster.workers[1], Worker)
4343
assert isinstance(cluster.workers["my-worker"], MyWorker)
4444

45-
assert cluster.workers[0].nthreads == 1
46-
assert cluster.workers[1].nthreads == 2
47-
assert cluster.workers["my-worker"].nthreads == 3
45+
assert cluster.workers[0].state.nthreads == 1
46+
assert cluster.workers[1].state.nthreads == 2
47+
assert cluster.workers["my-worker"].state.nthreads == 3
4848

4949
async with Client(cluster, asynchronous=True) as client:
5050
result = await client.submit(lambda x: x + 1, 10)
@@ -69,9 +69,9 @@ def test_spec_sync(loop):
6969
assert isinstance(cluster.workers[1], Worker)
7070
assert isinstance(cluster.workers["my-worker"], MyWorker)
7171

72-
assert cluster.workers[0].nthreads == 1
73-
assert cluster.workers[1].nthreads == 2
74-
assert cluster.workers["my-worker"].nthreads == 3
72+
assert cluster.workers[0].state.nthreads == 1
73+
assert cluster.workers[1].state.nthreads == 2
74+
assert cluster.workers["my-worker"].state.nthreads == 3
7575

7676
with Client(cluster, loop=loop) as client:
7777
assert cluster.loop is cluster.scheduler.loop

distributed/diagnostics/tests/test_worker_plugin.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def test_normal_task_transitions_called(c, s, w):
106106

107107
await c.register_worker_plugin(plugin)
108108
await c.submit(lambda x: x, 1, key="task")
109-
await async_wait_for(lambda: not w.tasks, timeout=10)
109+
await async_wait_for(lambda: not w.state.tasks, timeout=10)
110110

111111

112112
@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
@@ -148,7 +148,7 @@ async def test_superseding_task_transitions_called(c, s, w):
148148

149149
await c.register_worker_plugin(plugin)
150150
await c.submit(lambda x: x, 1, key="task", resources={"X": 1})
151-
await async_wait_for(lambda: not w.tasks, timeout=10)
151+
await async_wait_for(lambda: not w.state.tasks, timeout=10)
152152

153153

154154
@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
@@ -174,7 +174,7 @@ async def test_dependent_tasks(c, s, w):
174174

175175
await c.register_worker_plugin(plugin)
176176
await c.get(dsk, "task", sync=False)
177-
await async_wait_for(lambda: not w.tasks, timeout=10)
177+
await async_wait_for(lambda: not w.state.tasks, timeout=10)
178178

179179

180180
@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
@@ -207,7 +207,7 @@ class Dummy(WorkerPlugin):
207207
with warnings.catch_warnings(record=True) as record:
208208
await c.register_worker_plugin(Dummy())
209209
assert await c.submit(inc, 1, key="x") == 2
210-
while "x" in a.tasks:
210+
while "x" in a.state.tasks:
211211
await asyncio.sleep(0.01)
212212

213213
assert not record
@@ -235,7 +235,7 @@ def teardown(self, worker):
235235
await c.submit(inc, 0)
236236
assert w.foo == 123
237237

238-
while s.tasks or w.tasks:
238+
while s.tasks or w.state.tasks:
239239
await asyncio.sleep(0.01)
240240

241241
class MyCustomPlugin(WorkerPlugin):

distributed/http/scheduler/tests/test_scheduler_http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ async def fetch_metrics():
144144

145145
# submit a task which should show up in the prometheus scraping
146146
future = c.submit(slowinc, 1, delay=0.5)
147-
while not any(future.key in w.tasks for w in [a, b]):
147+
while not any(future.key in w.state.tasks for w in [a, b]):
148148
await asyncio.sleep(0.001)
149149

150150
active_metrics, forgotten_tasks = await fetch_metrics()
@@ -157,7 +157,7 @@ async def fetch_metrics():
157157

158158
future.release()
159159

160-
while any(future.key in w.tasks for w in [a, b]):
160+
while any(future.key in w.state.tasks for w in [a, b]):
161161
await asyncio.sleep(0.001)
162162

163163
active_metrics, forgotten_tasks = await fetch_metrics()

distributed/http/worker/prometheus/core.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ def collect(self):
2727
labels=["state"],
2828
)
2929
tasks.add_metric(["stored"], len(self.server.data))
30-
tasks.add_metric(["executing"], self.server.executing_count)
31-
tasks.add_metric(["ready"], len(self.server.ready))
32-
tasks.add_metric(["waiting"], self.server.waiting_for_data_count)
30+
tasks.add_metric(["executing"], self.server.state.executing_count)
31+
tasks.add_metric(["ready"], len(self.server.state.ready))
32+
tasks.add_metric(["waiting"], self.server.state.waiting_for_data_count)
3333
yield tasks
3434

3535
yield GaugeMetricFamily(
3636
self.build_name("concurrent_fetch_requests"),
3737
"Number of open fetch requests to other workers.",
38-
value=len(self.server.in_flight_workers),
38+
value=len(self.server.state.in_flight_workers),
3939
)
4040

4141
yield GaugeMetricFamily(
4242
self.build_name("threads"),
4343
"Number of worker threads.",
44-
value=self.server.nthreads,
44+
value=self.server.state.nthreads,
4545
)
4646

4747
yield GaugeMetricFamily(

distributed/tests/test_active_memory_manager.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,12 +459,12 @@ async def test_drop_with_paused_workers_with_running_tasks_1(c, s, a, b):
459459
x = (await c.scatter({"x": 1}, broadcast=True))["x"]
460460
y = c.submit(slowinc, x, delay=2.5, key="y", workers=[a.address])
461461

462-
while "y" not in a.tasks or a.tasks["y"].state != "executing":
462+
while "y" not in a.state.tasks or a.state.tasks["y"].state != "executing":
463463
await asyncio.sleep(0.01)
464464
a.status = Status.paused
465465
while s.workers[a.address].status != Status.paused:
466466
await asyncio.sleep(0.01)
467-
assert a.tasks["y"].state == "executing"
467+
assert a.state.tasks["y"].state == "executing"
468468

469469
s.extensions["amm"].run_once()
470470
await y
@@ -509,7 +509,7 @@ async def test_drop_with_paused_workers_with_running_tasks_3_4(c, s, a, b, pause
509509
"""
510510
x = (await c.scatter({"x": 1}, broadcast=True))["x"]
511511
y = c.submit(slowinc, x, delay=2.5, key="y", workers=[a.address])
512-
while "y" not in a.tasks or a.tasks["y"].state != "executing":
512+
while "y" not in a.state.tasks or a.state.tasks["y"].state != "executing":
513513
await asyncio.sleep(0.01)
514514

515515
if pause:
@@ -519,7 +519,7 @@ async def test_drop_with_paused_workers_with_running_tasks_3_4(c, s, a, b, pause
519519
await asyncio.sleep(0.01)
520520

521521
assert s.tasks["y"].state == "processing"
522-
assert a.tasks["y"].state == "executing"
522+
assert a.state.tasks["y"].state == "executing"
523523

524524
s.extensions["amm"].run_once()
525525
await y
@@ -544,10 +544,10 @@ async def test_drop_with_paused_workers_with_running_tasks_5(c, s, w1, w2, w3):
544544

545545
def executing() -> bool:
546546
return (
547-
"y1" in w1.tasks
548-
and w1.tasks["y1"].state == "executing"
549-
and "y2" in w3.tasks
550-
and w3.tasks["y2"].state == "executing"
547+
"y1" in w1.state.tasks
548+
and w1.state.tasks["y1"].state == "executing"
549+
and "y2" in w3.state.tasks
550+
and w3.state.tasks["y2"].state == "executing"
551551
)
552552

553553
while not executing():

distributed/tests/test_actor.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async def test_client_actions(s, a, b, direct_to_workers):
7878

7979
assert counter._address == a.address
8080

81-
assert isinstance(a.actors[counter.key], Counter)
81+
assert isinstance(a.state.actors[counter.key], Counter)
8282
assert s.tasks[counter.key].actor
8383

8484
await asyncio.gather(counter.increment(), counter.increment())
@@ -186,7 +186,7 @@ async def test_gc(c, s, a, b):
186186
await wait(actor)
187187
del actor
188188

189-
while a.actors or b.actors:
189+
while a.state.actors or b.state.actors:
190190
await asyncio.sleep(0.01)
191191

192192

@@ -200,15 +200,15 @@ async def test_track_dependencies(c, s, a, b):
200200

201201
await asyncio.sleep(0.3)
202202

203-
assert a.actors or b.actors
203+
assert a.state.actors or b.state.actors
204204

205205

206206
@gen_cluster(client=True)
207207
async def test_future(c, s, a, b):
208208
counter = c.submit(Counter, actor=True, workers=[a.address])
209209
assert isinstance(counter, Future)
210210
await wait(counter)
211-
assert isinstance(a.actors[counter.key], Counter)
211+
assert isinstance(a.state.actors[counter.key], Counter)
212212

213213
counter = await counter
214214
assert isinstance(counter, Actor)
@@ -364,7 +364,7 @@ def add(n, counter):
364364
while not done.done():
365365
assert (
366366
len([ws for ws in s.workers.values() if ws.processing])
367-
<= a.nthreads + b.nthreads
367+
<= a.state.nthreads + b.state.nthreads
368368
)
369369
await asyncio.sleep(0.01)
370370

@@ -430,7 +430,7 @@ def __init__(self, x, y=None):
430430
actors = c.map(Foo, range(10), y=b, actor=True)
431431
await wait(actors)
432432

433-
assert all(len(w.actors) == 2 for w in workers)
433+
assert all(len(w.state.actors) == 2 for w in workers)
434434

435435

436436
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4, Worker=Nanny)
@@ -588,7 +588,7 @@ async def test_worker_actor_handle_is_weakref(c, s, a, b):
588588
del counter
589589

590590
start = time()
591-
while a.actors or b.data:
591+
while a.state.actors or b.data:
592592
await asyncio.sleep(0.1)
593593
assert time() < start + 30
594594

0 commit comments

Comments
 (0)