-
-
Notifications
You must be signed in to change notification settings - Fork 749
data_needed exclusively contains tasks in fetch state #6481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
63014ad to
a032754
Compare
| # Ensure maxsize is respected | ||
| l["d"] = 4 | ||
| assert len(l) == 3 | ||
| assert list(l.keys()) == ["c", "a", "d"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from test_utils.py
|
|
||
| await fut2 | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests don't make sense anymore. Also, one of them directly tampers with the state which is a big no-no.
|
|
||
| @gen_cluster(client=True) | ||
| async def test_gather_dep_exception_one_task(c, s, a, b): | ||
| """Ensure an exception in a single task does not tear down an entire batch of gather_dep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is misleading.
This test was testing resilience to an exception in the transitions of a single task after gather_dep - which should be dealt with through @fail_hard.
A legitimate exception in a single key of the bundle in gather_dep, namely a task that fails to unpickle, does make the whole gather_dep fail for all tasks. There's no code whatsoever to deal with this use case.
3ee77db to
b6f5467
Compare
b6f5467 to
74640b8
Compare
|
This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker after it's been forgotten. |
|
Ready for final review and merge @fjetter |
| elif ts not in recommendations: | ||
| ts.who_has.discard(worker) | ||
| self.has_what[worker].discard(ts.key) | ||
| self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could it even ben in here? Shouldn't it have been removed when the task transitioned to flight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there could have been a transition to fetch in the meantime.
5e1c229 to
23e6698
Compare
|
All of @gjoseph92's comments have been addressed |
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just peeked over it and would trust @gjoseph92 with a more thorough review. I'm looking forward to this
| return value | ||
| heapq.heappop(self._heap) | ||
|
|
||
| def pop(self) -> T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the Heap prefix of the class already implies some non-constant operation, I wouldn't mind documenting the actual complexity since this can differ for heaps. However, I think the stdlib doesn't document this properly either and I'm ok with skipping this.
peek() is O(1) if you treat the bit that calls heappop as delayed housekeeping - e.g. you account for it in discard().
I think the amortized time for both peek and discard are constant. I don't think we should dive into deep algorithm complexity analysis here, though :)
| break | ||
| tasks.pop() | ||
| deps.add(ts.key) | ||
| self.data_needed.remove(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love it
| if ts.state == "fetch": | ||
| self.data_needed_per_worker[worker].remove(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if ts.state == "fetch": | |
| self.data_needed_per_worker[worker].remove(ts) | |
| self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd much rather fail loudly if for any reason the assumption that fetch state and existence in data_needed are inextricably bound together fails
Uh oh!
There was an error while loading. Please reload this page.