Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically restart P2P shuffles when output worker leaves #7970

Merged
merged 88 commits into from
Jul 24, 2023

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Jul 5, 2023

Closes #7353

Blocked by and includes #7967
Blocked by and includes #7979
Blocked by and includes #7981
Blocked by and includes #7974

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Jul 5, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±         0         20 suites  ±0   11h 35m 10s ⏱️ + 10h 14m 44s
  3 738 tests +     434    3 623 ✔️ +     553     106 💤  - 126  9 +8 
36 154 runs  +29 467  34 397 ✔️ +28 504  1 748 💤 +971  9 +8 

For more details on these failures, see this check.

Results for commit 5764f45. ± Comparison against base commit efc7eeb.

This pull request skips 1 and un-skips 138 tests.
distributed.shuffle.tests.test_merge
distributed.cli.tests.test_dask_worker ‑ test_dashboard_non_standard_ports
distributed.cli.tests.test_dask_worker ‑ test_listen_address_ipv6[tcp://:---nanny]
distributed.cli.tests.test_dask_worker ‑ test_listen_address_ipv6[tcp://:---no-nanny]
distributed.cli.tests.test_dask_worker ‑ test_listen_address_ipv6[tcp://[::1]:---nanny]
distributed.cli.tests.test_dask_worker ‑ test_listen_address_ipv6[tcp://[::1]:---no-nanny]
distributed.comm.tests.test_comms ‑ test_default_client_server_ipv6[asyncio]
distributed.comm.tests.test_comms ‑ test_default_client_server_ipv6[tornado]
distributed.comm.tests.test_comms ‑ test_tcp_client_server_ipv6[asyncio]
distributed.comm.tests.test_comms ‑ test_tcp_client_server_ipv6[tornado]
distributed.comm.tests.test_comms ‑ test_tls_client_server_ipv6[asyncio]
…

♻️ This comment has been updated with latest results.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@hendrikmakait
Copy link
Member Author

@wence-: Your feedback should be addressed and blocking PRs are merged, so this should be good for another round.

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me, but not familiar enough with the code-base to be the final reviewer

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of my comments are nits and you can ignore or use them.

The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging

Comment on lines 398 to 404
try:
shuffle = self.states[shuffle_id]
shuffle = self.active_shuffles[shuffle_id]
except KeyError:
return
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id)
pass
else:
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more a style note and I typically try to avoid style questions in a PR review. Still, this feels a bit convoluted. I believe something like

if shuffle := self.active_shuffles.get(shuffle_id):
    self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
    self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)
elif finish == "forgotten":
    ...

is easier to read than try/except;pass/else. with or without walrus.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to ignore. Logic is the same in the end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, this bit has gotten out of hand.

Comment on lines 54 to 55
def __eq__(self, other: Any) -> bool:
return type(other) == type(self) and other.run_id == self.run_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be not covered by tests. Why is this necessary then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had some use in an earlier iteration. Removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is because __hash__ is now not the default and this addition of __eq__ ensures that __eq__ and the newly defined __hash__ are consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is because the run_id is a unique token that defines the shuffle state object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is because the run_id is a unique token that defines the shuffle state object.

That's why I initially had a new __eq__. Then __hash__ had to match it. Now I'm only using __hash__, so I think there's no need for a custom __eq__ that could potentially get outdated.

Comment on lines 97 to 98
except ShuffleClosedError:
raise Reschedule()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lack of coverage make me nervous. I think around the barrier there are various races we should test.

Copy link
Member Author

@hendrikmakait hendrikmakait Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been able to come up with a scenario where this would be triggered (and relevant), so I've removed it for now. If this ever pops up for somebody, I hope they'll send a bug report our way.

self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)

if finish == "forgotten":
shuffles = self._shuffles.pop(shuffle_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this entire logic is just there to clean up. Behavior would not be impacted if we didn't do any of this, correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is state cleanup on the scheduler plugin.

recs.update({dt.key: "released"})

if barrier_task.state == "erred":
return {} # pragma: no cover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you want coverage to detect this? Seems like an important case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to explain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an ideal case for an assert False, "Invariant broken" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also work. I'm wondering if assert False is the right thing to add here given that PYTHONOPTIMIZE will strip them. It would work as an addition though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raising a RuntimeError now


for dt in barrier_task.dependencies:
if dt.state == "erred":
return {} # pragma: no cover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to explain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

Comment on lines 879 to 880
while self._runs:
await asyncio.sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't insist on this but I don't like these sleep patterns.

def __init__(...):
    self._runs_condition = asyncio.Condition()

async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None:
    await shuffle.close()
    async with self._runs_condition:
        self._runs.remove(shuffle)
        self._runs_condition.notify_all()

async def teardown(self, worker: Worker) -> None:
    ...
    async with self._runs_condition:
        await self._runs_condition.wait_for(lambda: not self._runs)

would be a clean alternative. Many people consider Conditions too complex but what I like about them is that they make this relationship very clear (and they unblock immediately which is nice for testing and such things).

As I said, I don't insist on this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To a more serious question: Is it possible for _runs to be repopulated at this point or are we locking everything up properly for this to not happen once we reach this point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! I've added the condition. At this point the plugin is closed which will raise a ShuffleClosedError before a new run can be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_runs is added to in _refresh_shuffle which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

teardown sets

async def teardown(self, worker: Worker) -> None:
assert not self.closed
self.closed = True

Once that is done,

if self.closed:
raise ShuffleClosedError(f"{self} has already been closed")
will raise. We don't yield the event loop in-between that raise and
self._runs.add(shuffle)

@gen_cluster(
client=True,
nthreads=[("", 1)] * 2,
config={"distributed.scheduler.allowed-failures": 0},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that P2P is now retried allowed-failures times if a worker goes OOM?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be a dealbreaker but I also don't think this is useful. It's very unlikely that another P2P run attempt would be more successful.

However, there are of course also cases like spot interruption where this matters... Never mind!

Copy link
Member Author

@hendrikmakait hendrikmakait Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that P2P is now retried allowed-failures times if a worker goes OOM?

Yes, as there might be other causes apart from an output partition being too large.

@@ -578,12 +699,39 @@ async def test_closed_worker_during_unpack(c, s, a, b):
freq="10 s",
)
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
out = out.persist()
x, y = c.compute([df.x.size, out.x.size])
await wait_for_tasks_in_state("shuffle-p2p", "memory", 1, b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is out of scope for this PR but I think it would make sense to have an API to easily get access to the actual shuffle instnaces held by the plugins and have a stage attribute that indicates whether we're in transfer, barrier or unpack stage.
I would find this kind of verification nicer than waiting for task states.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of my comments are nits and you can ignore or use them.

The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging

Comment on lines +835 to +889
def _create_shuffle_run(
self, shuffle_id: ShuffleId, result: dict[str, Any]
) -> ShuffleRun:
shuffle: ShuffleRun
if result["type"] == ShuffleType.DATAFRAME:
shuffle = DataFrameShuffleRun(
column=result["column"],
worker_for=result["worker_for"],
output_workers=result["output_workers"],
id=shuffle_id,
run_id=result["run_id"],
directory=os.path.join(
self.worker.local_directory,
f"shuffle-{shuffle_id}-{result['run_id']}",
),
executor=self._executor,
local_address=self.worker.address,
rpc=self.worker.rpc,
scheduler=self.worker.scheduler,
memory_limiter_disk=self.memory_limiter_disk,
memory_limiter_comms=self.memory_limiter_comms,
)
shuffle = self._create_dataframe_shuffle_run(shuffle_id, result)
elif result["type"] == ShuffleType.ARRAY_RECHUNK:
shuffle = ArrayRechunkRun(
worker_for=result["worker_for"],
output_workers=result["output_workers"],
old=result["old"],
new=result["new"],
id=shuffle_id,
run_id=result["run_id"],
directory=os.path.join(
self.worker.local_directory,
f"shuffle-{shuffle_id}-{result['run_id']}",
),
executor=self._executor,
local_address=self.worker.address,
rpc=self.worker.rpc,
scheduler=self.worker.scheduler,
memory_limiter_disk=self.memory_limiter_disk,
memory_limiter_comms=self.memory_limiter_comms,
)
shuffle = self._create_array_rechunk_run(shuffle_id, result)
else: # pragma: no cover
raise TypeError(result["type"])
self.shuffles[shuffle_id] = shuffle
self._runs.add(shuffle)
return shuffle

def _create_dataframe_shuffle_run(
self, shuffle_id: ShuffleId, result: dict[str, Any]
) -> DataFrameShuffleRun:
return DataFrameShuffleRun(
column=result["column"],
worker_for=result["worker_for"],
output_workers=result["output_workers"],
id=shuffle_id,
run_id=result["run_id"],
directory=os.path.join(
self.worker.local_directory,
f"shuffle-{shuffle_id}-{result['run_id']}",
),
executor=self._executor,
local_address=self.worker.address,
rpc=self.worker.rpc,
scheduler=self.worker.scheduler,
memory_limiter_disk=self.memory_limiter_disk,
memory_limiter_comms=self.memory_limiter_comms,
)

def _create_array_rechunk_run(
self, shuffle_id: ShuffleId, result: dict[str, Any]
) -> ArrayRechunkRun:
return ArrayRechunkRun(
worker_for=result["worker_for"],
output_workers=result["output_workers"],
old=result["old"],
new=result["new"],
id=shuffle_id,
run_id=result["run_id"],
directory=os.path.join(
self.worker.local_directory,
f"shuffle-{shuffle_id}-{result['run_id']}",
),
executor=self._executor,
local_address=self.worker.address,
rpc=self.worker.rpc,
scheduler=self.worker.scheduler,
memory_limiter_disk=self.memory_limiter_disk,
memory_limiter_comms=self.memory_limiter_comms,
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetical refactoring to make it easier to understand whether we could potentially encounter races.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the maintenance of invariants in remove_worker, plus a similar request to @fjetter for some coverage on edge cases?

Comment on lines 54 to 55
def __eq__(self, other: Any) -> bool:
return type(other) == type(self) and other.run_id == self.run_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is because __hash__ is now not the default and this addition of __eq__ ensures that __eq__ and the newly defined __hash__ are consistent.

Comment on lines 54 to 55
def __eq__(self, other: Any) -> bool:
return type(other) == type(self) and other.run_id == self.run_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is because the run_id is a unique token that defines the shuffle state object.

Comment on lines 169 to 170
if worker not in self.scheduler.workers:
raise RuntimeError(f"Scheduler is unaware of this worker {worker!r}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be tested by retiring a worker during a shuffle in a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been able to come up with a scenario where this would happen, but given how messy worker shutdown can be, I'm not 100% certain this would never happen. Left it in with a note for now.

Comment on lines 367 to 368
if worker not in shuffle.participating_workers:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test by adding a worker to the cluster and then restarting a shuffle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored.


stimulus_id = f"shuffle-failed-worker-left-{time()}"
self._restart_shuffle(shuffle.id, scheduler, stimulus_id=stimulus_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so first we restart all shuffles that were interrupted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that restarting this shuffle should remove it from _archived_by but I do not see that happening. Do I have that right? Or does this somehow create a new shuffle object that has archived_by = None. Otherwise it seems like it might get lost in _clean_on_scheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restarting a shuffle removes the ShuffleState from active states. The first shuffle_transfer task to call shuffle_get_or_create will cause the SchedulerPlugin to create a new ShuffleState with an incremented run_id and _archived_by = None.

# If processing the transactions causes a task to get released, this
# removes the shuffle from self.active_shuffles. Therefore, we must iterate
# over a copy.
for shuffle_id, shuffle in self.active_shuffles.copy().items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we iterate over all active shuffles, remove and restart?

Why do we not unconditionally restart the archived shuffles after this loop over active shuffles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure I'm following, but what I think you're saying is a very good point.

Comment on lines 879 to 880
while self._runs:
await asyncio.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_runs is added to in _refresh_shuffle which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown.

recs.update({dt.key: "released"})

if barrier_task.state == "erred":
return {} # pragma: no cover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an ideal case for an assert False, "Invariant broken" ?


for dt in barrier_task.dependencies:
if dt.state == "erred":
return {} # pragma: no cover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

Comment on lines 901 to 902
async with self._runs_condition:
await self._runs_condition.wait_for(lambda: not self._runs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock protects _runs wrt _close_shuffle_run but not wrt _refresh_shuffle I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it to _runs_cleanup_condition to highlight that it's only concerned with cleanup. There's a different mechanism in place for adding to self._runs. (Feel free to refactor in a follow-up if you see a good way of doing so.)

@hendrikmakait
Copy link
Member Author

I've added another test, now all feedback should be addressed. For # pragma: nocover code I haven't been able to create a reproducer, but I think it doesn't hurt to leave those few places in the codebase "just in case". This should be ready for another review.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the best of my understanding, this looks right!

@hendrikmakait hendrikmakait merged commit f0303aa into dask:main Jul 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Automatically restart P2P shuffles that were aborted due to leaving workers
4 participants