Skip to content

Commit

Permalink
Remove indirection
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Dec 9, 2022
1 parent 5f1a41f commit 3c90f50
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions distributed/shuffle/_shuffle_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def __init__(self, worker: Worker) -> None:
worker.handlers["shuffle_receive"] = self.shuffle_receive
worker.handlers["shuffle_inputs_done"] = self.shuffle_inputs_done
worker.handlers["shuffle_fail"] = self.shuffle_fail
worker.stream_handlers["shuffle-forget"] = self.shuffle_forget
worker.stream_handlers["shuffle-fail"] = self.shuffle_fail
worker.extensions["shuffle"] = self

# Initialize
Expand Down Expand Up @@ -397,9 +397,6 @@ async def shuffle_fail(self, shuffle_id: ShuffleId, message: str) -> None:
await shuffle.close()
del self.shuffles[shuffle_id]

async def shuffle_forget(self, shuffle_id: ShuffleId) -> None:
await self.shuffle_fail(shuffle_id, message="Shuffle {shuffle_id} forgotten")

def add_partition(
self,
data: pd.DataFrame,
Expand Down Expand Up @@ -779,7 +776,13 @@ def transition(
shuffle_id = ShuffleSchedulerExtension.id_from_key(key)
participating_workers = self.participating_workers[shuffle_id]
worker_msgs = {
worker: [{"op": "shuffle-forget", "shuffle_id": shuffle_id}]
worker: [
{
"op": "shuffle-fail",
"shuffle_id": shuffle_id,
"message": f"Shuffle {shuffle_id} forgotten",
}
]
for worker in participating_workers
}
self._clean_on_scheduler(shuffle_id)
Expand Down

0 comments on commit 3c90f50

Please sign in to comment.