Skip to content

Conversation

@crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented May 31, 2022

  • Closes DEADLOCK: fetch->forgotten->fetch #6480
  • This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker, potentially for a long time, after it's been forgotten.

@crusaderky crusaderky self-assigned this May 31, 2022
@github-actions
Copy link
Contributor

Unit Test Results

     14 files   -          1       14 suites   - 1   0s ⏱️ - 6h 26m 53s
   908 tests  -   1 911     855 ✔️  -   1 881    40 💤  -   40  13 +10 
2 423 runs   - 18 473  2 221 ✔️  - 17 733  181 💤  - 758  21 +18 

For more details on these failures, see this check.

Results for commit c441d35. ± Comparison against base commit c2b28cf.

# Ensure maxsize is respected
l["d"] = 4
assert len(l) == 3
assert list(l.keys()) == ["c", "a", "d"]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from test_utils.py


await fut2


Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests don't make sense anymore. Also, one of them directly tampers with the state which is a big no-no.


@gen_cluster(client=True)
async def test_gather_dep_exception_one_task(c, s, a, b):
"""Ensure an exception in a single task does not tear down an entire batch of gather_dep
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is misleading.
This test was testing resilience to an exception in the transitions of a single task after gather_dep - which should be dealt with through @fail_hard.

A legitimate exception in a single key of the bundle in gather_dep, namely a task that fails to unpickle, does make the whole gather_dep fail for all tasks. There's no code whatsoever to deal with this use case.

@crusaderky crusaderky force-pushed the WSMR/forgotten_data_needed branch from b6f5467 to 74640b8 Compare June 1, 2022 14:28
@crusaderky
Copy link
Collaborator Author

crusaderky commented Jun 1, 2022

This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker after it's been forgotten.
CC @hendrikmakait

@crusaderky crusaderky marked this pull request as ready for review June 1, 2022 16:19
@crusaderky crusaderky requested a review from fjetter June 1, 2022 16:20
@crusaderky
Copy link
Collaborator Author

Ready for final review and merge @fjetter

elif ts not in recommendations:
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.data_needed_per_worker[worker].discard(ts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could it even ben in here? Shouldn't it have been removed when the task transitioned to flight?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there could have been a transition to fetch in the meantime.

@crusaderky crusaderky force-pushed the WSMR/forgotten_data_needed branch from 5e1c229 to 23e6698 Compare June 1, 2022 16:56
@crusaderky
Copy link
Collaborator Author

All of @gjoseph92's comments have been addressed

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just peeked over it and would trust @gjoseph92 with a more thorough review. I'm looking forward to this

return value
heapq.heappop(self._heap)

def pop(self) -> T:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the Heap prefix of the class already implies some non-constant operation, I wouldn't mind documenting the actual complexity since this can differ for heaps. However, I think the stdlib doesn't document this properly either and I'm ok with skipping this.

peek() is O(1) if you treat the bit that calls heappop as delayed housekeeping - e.g. you account for it in discard().

I think the amortized time for both peek and discard are constant. I don't think we should dive into deep algorithm complexity analysis here, though :)

break
tasks.pop()
deps.add(ts.key)
self.data_needed.remove(ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love it

Comment on lines +3452 to +3453
if ts.state == "fetch":
self.data_needed_per_worker[worker].remove(ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ts.state == "fetch":
self.data_needed_per_worker[worker].remove(ts)
self.data_needed_per_worker[worker].discard(ts)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather fail loudly if for any reason the assumption that fetch state and existence in data_needed are inextricably bound together fails

@crusaderky crusaderky merged commit a341432 into dask:main Jun 1, 2022
@crusaderky crusaderky deleted the WSMR/forgotten_data_needed branch June 1, 2022 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DEADLOCK: fetch->forgotten->fetch

3 participants