Skip to content

Client submit with workers doesn’t handle new joining workers correctly #8862

Open
@YuriFeigin

Description

@YuriFeigin

Use client.submit() with workers argument will run on other workers if they will be joining after you run the submit

Do reproduce it, first run a scheduler and the two workers

dask scheduler
dask worker tcp://192.0.0.100:8786 --nthreads 1 --name a-0
dask worker tcp://192.0.0.100:8786 --nthreads 1 --name a-1

then submit the jobs

import time
from distributed import get_worker
from dask.distributed import Client

client = Client("tcp://192.0.0.100:8786")


def run_me(i):
    worker = get_worker()
    print(f"worker name {worker.name}, run {i}")
    time.sleep(30)


futures = []
for i in range(100):
    futures.append(client.submit(run_me, i, workers=["a-0", "b-0"]))

for f in futures:
    f.result()

This is working properly and only worker a-0 is getting jobs.
adding one more worker

dask worker tcp://192.0.0.100:8786 --nthreads 1 --name b-0

Now worker a-1 start getting jobs as well
P.S, If instead of adding worker with name b-0 I add worker with name b-1, everything will work correct, so the problem is only happen when I'm adding a worker name that part of workers argument.

  • Dask version: 2024.8.1
  • Python version: 3.10.14
  • Operating System: macOS
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions