Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure resumed tasks are not accidentally forgotten #6217

Merged
merged 4 commits into from
May 6, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Apr 27, 2022

closes #6194

  • Tests added / passed
  • Passes pre-commit run --all-files

Three changes in this PR

  • Fix an erroneous transition resumed->released which previously actually released a key but instead it should transition to cancelled if the task is still being processed.
  • Removed a redundant cancelled->resumed transition which makes the logs much less verbose and easier to read. This transition was only an indirection and I instead inlined the code at the two places where this matters. The transition log now reads as expected in these situations
  • Ensure that ts._next is not set for cancelled tasks. Cancelled tasks should always transition to released once they are done.

distributed/utils_test.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Apr 27, 2022

The windows test failure is a known issue

#6147

File "D:\a\distributed\distributed\distributed\worker.py", line 4053, in validate_task

    self.validate_task_fetch(ts)

  File "D:\a\distributed\distributed\distributed\worker.py", line 3995, in validate_task_fetch

    assert ts.who_has

AssertionError

2022-04-27 11:54:02,400 - distributed.core - ERROR - Invalid TaskState encountered for <TaskState "('arange-sum-1960f5e08fa9c9689f5e9ef3be470[377](https://github.com/dask/distributed/runs/6192747991?check_suite_focus=true#step:11:377)', 8)" fetch>.

Story:

[("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'compute-task', 'compute-task-1651060440.6118023', 1651060440.640618), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'waiting', 'waiting', {"('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)": 'ready'}, 'compute-task-1651060440.6118023', 1651060440.6406472), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'waiting', 'ready', 'ready', {}, 'compute-task-1651060440.6118023', 1651060440.6406634), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ready', 'executing', 'executing', {}, 'task-finished-1651060440.648988', 1651060440.6492662), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'put-in-memory', 'task-finished-1651060440.6506114', 1651060440.6508203), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'executing', 'memory', 'memory', {"('arange-sum-59ee542fb6a8f7c418bd1ceadfe68d11', 8)": 'executing'}, 'task-finished-1651060440.6506114', 1651060440.65087), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ensure-task-exists', 'memory', 'compute-task-1651060440.6795359', 1651060440.705263), ('free-keys', ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)",), 'task-finished-1651060440.7063837', 1651060440.743776), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'release-key', 'task-finished-1651060440.7063837', 1651060440.74[379](https://github.com/dask/distributed/runs/6192747991?check_suite_focus=true#step:11:379)23), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'memory', 'released', 'released', {}, 'task-finished-1651060440.706[383](https://github.com/dask/distributed/runs/6192747991?check_suite_focus=true#step:11:383)7', 1651060440.74[384](https://github.com/dask/distributed/runs/6192747991?check_suite_focus=true#step:11:384)28), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'forgotten', 'forgotten', {}, 'client-releases-keys-1651060442.0950112', 1651060442.1340976), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'compute-task', 'compute-task-1651060442.1040328', 1651060442.1347113), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'waiting', 'waiting', {"('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)": 'ready'}, 'compute-task-1651060442.1040328', 1651060442.1347415), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'waiting', 'ready', 'ready', {}, 'compute-task-1651060442.1040328', 1651060442.1347587), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ready', 'executing', 'executing', {}, 'task-finished-1651060442.1677341', 1651060442.1682491), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'put-in-memory', 'task-finished-1651060442.169614', 1651060442.1697793), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'executing', 'memory', 'memory', {"('arange-sum-b267f04bd650ea06d6abf5234529c789', 8)": 'executing'}, 'task-finished-1651060442.169614', 1651060442.1698287), ('free-keys', ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)",), 'client-releases-keys-1651060442.290155', 1651060442.3110044), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'release-key', 'client-releases-keys-1651060442.290155', 1651060442.3110235), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'memory', 'released', 'released', {"('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)": 'forgotten'}, 'client-releases-keys-1651060442.290155', 1651060442.311108), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'forgotten', 'forgotten', {}, 'client-releases-keys-1651060442.290155', 1651060442.3111255), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'compute-task', 'compute-task-1651060442.3011189', 1651060442.3126717), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'waiting', 'waiting', {"('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)": 'ready'}, 'compute-task-1651060442.3011189', 1651060442.3127015), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'waiting', 'ready', 'ready', {}, 'compute-task-1651060442.3011189', 1651060442.3127184), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ensure-task-exists', 'ready', 'compute-task-1651060442.3024514', 1651060442.3128352), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'release-key', 'compute-task-1651060442.3024514', 1651060442.3128846), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ready', 'released', 'released', {}, 'compute-task-1651060442.3024514', 1651060442.3129065), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'released', 'fetch', 'fetch', {}, 'compute-task-1651060442.3024514', 1651060442.312925), ("('arange-sum-1960f5e08fa9c9689f5e9ef3be470377', 8)", 'ready', 'fetch', 'fetch', {}, 'compute-task-1651060442.3024514', 1651060442.31293)]

@github-actions
Copy link
Contributor

github-actions bot commented Apr 27, 2022

Unit Test Results

       16 files  ±  0         16 suites  ±0   7h 23m 52s ⏱️ - 9m 40s
  2 760 tests +  3    2 680 ✔️ +  5       78 💤  - 1  2  - 1 
22 042 runs  +24  21 023 ✔️ +25  1 017 💤 +1  2  - 2 

For more details on these failures, see this check.

Results for commit 1e23e1e. ± Comparison against base commit 2286896.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

I took a brief look at this. No objection from me, but I didn't dive deeply into the logic. If tests pass and you're feeling confident about the added value @fjetter I think that it's ok to merge. If you can get someone like @crusaderky or @gjoseph92 to take a look that would be better of course.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never understood the flow for cancelled and released tasks, so I don't think I'm in a good position to review this. I find these states incredibly confusing. I'm looking forward to this hopefully making more sense with #5895.

distributed/worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
if not ts.done:
ts.state = "cancelled"
ts._next = None
return {}, []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going to eventually pick this up and move it out of cancelled if there are no recommendations and no next?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR Once ts.done = True is set, i.e. execute/flight is done

ts._next should never have been set for cancelled. When I implemented cancelled/resumed states I made a few mistakes. The only relevant next state for cancelled is released. That's the entire point of the state. The worker was instructed to release a key but it can't because it is "stuck" waiting for something to finish, i.e. either the execution thread or the gather_data coroutine.
Once execution/gather finishes, they'll recommend a transition, e.g. upon success they'll recommend a transition to memory. For example cancelled->memory will ensure the key is released.

Why is this logic not directly implemented as part of the gather_dep/execute result parser? Well, have a look at the code there. Particularly the gather_dep result parser/finally clause is the most frequent source of deadlocks because the logic just blows up.
There is a bit of a design philosophy behind this to break a big, complex decision up into many small decisions that can be made using local context information.

Consider the following example

  • T1 was instructed to be computed
  • T1 is dispatched to the threadpool
  • T1 is requested to be released
  • T1 finishes

The result, i.e. once it finishes could be implemented as

if result == "success":
    if ts.not_cancelled:
        put_key_in_memory()
    else:
        release_key()
else:
    result == "failed"
    
    if ts.not_cancelled:
        if ts.asked_to_be_fetched_instead:
            # Whether this is a valid thing for the scheduler to ask is out
            # of scope for this comment. It happens/happened
            reschedule_to_fetch_key()
        else:
            put_key_in_memory()
    else:
        release_key()

With this transition system, it instead becomes

# executing result parser
# This only requires local context, the decision should be simple, straight forward
if result == "success":
    recommend_memory()
else:
    assert result == "success"
    recommend_error()

def transition_cancelled_error(...):
    assert stuff
    release_task()

def transition_cancelled_memory(...):
    assert stuff
    put_key_in_memory()

This decision tree is a bit more complex for gathering keys. I'm not 100% convinced anymore if this is the right approach but here we are right now. The recent refactoring will allow us mid-term to move away from this if we choose to do so.

distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented May 5, 2022

I believe at least some of the test failures relate to #5910

distributed/worker.py Outdated Show resolved Hide resolved
# We'll ignore instructions, i.e. we choose to not submit the failure
# message to the scheduler since from the schedulers POV it already
# released this task
recs, _ = self.transition_executing_error(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All transitions from executing call ensure_computing.
This should deadlock the worker if there are any tasks in ready state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this. This does not deadlock since the transition generates a recommendation. Only after acting on that recommendation we'll get an instruction.

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transition_cancelled_error is accidentally dropping Execute instructions

distributed/worker.py Outdated Show resolved Hide resolved
Comment on lines +197 to +199
# Queue up another task to ensure this is not affected by our error handling
fut2 = c.submit(inc, 1)
await wait_for_state(fut2.key, "ready", w)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crusaderky this triggers the condition you are concerned about in the transition function about dropped instructions.

The task is queued up and we'll receive a recommendation. The only instruction at that point is the TaskErred message.

@fjetter fjetter merged commit 7c57fdc into dask:main May 6, 2022
@fjetter fjetter deleted the in_flight_released branch May 6, 2022 13:56
@crusaderky
Copy link
Collaborator

I think there may be a regression here: #6305 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KeyError in gather_dep
4 participants