Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented May 6, 2025

This raises transition errors as

2025-05-06 13:23:15,012 - distributed.scheduler - INFO - User asked for computation on lost data. Final key is foo with missing dependency bar
2025-05-06 13:23:15,015 - distributed.scheduler - ERROR - Error transitioning 'bar' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2037, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2488, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 3410, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 3578, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'bar' processing>
2025-05-06 13:23:15,018 - distributed.scheduler - ERROR - <TaskState 'bar' processing>
2025-05-06 13:23:15,018 - distributed.protocol.pickle - ERROR - Failed to serialize <TaskState 'bar' processing>.
Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 4936, in update_graph
    metrics = self._create_taskstate_from_graph(
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 4809, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 8294, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2154, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2037, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2488, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 3410, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 3578, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'bar' processing>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle 'weakref.ReferenceType' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
TypeError: cannot pickle 'weakref.ReferenceType' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/Users/fjetter/miniforge3/envs/dask-distributed/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "/Users/fjetter/miniforge3/envs/dask-distributed/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
TypeError: cannot pickle 'weakref.ReferenceType' object

If validate is enabled, as in our test suite, the error is simply

2025-05-06 13:19:47,262 - distributed.scheduler - ERROR - Error transitioning 'bar' from 'released' to 'waiting'
Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2037, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/Users/fjetter/workspace/distributed/distributed/scheduler.py", line 2172, in _transition_released_waiting
    assert ts.run_spec
AssertionError


# We'll need the list to be
# ['key', 'lost_dep_of_key']
keys = key, lost_dep_of_key = list({"foo", "bar"})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of this is depending on the ordering of the keys. In particular _find_lost_dependencies is misbehaving depending on the order of this line

for k in list(keys):

With this test setup we always prepare for the case where the one key is not discarded as it should be

@fjetter
Copy link
Member Author

fjetter commented May 6, 2025

The weakref thing happens in error_message while handling the exception in update_graph. It is just a log which is why the TB is cut off.

The weakref this is referring to is probably TaskState._instances

@github-actions
Copy link
Contributor

github-actions bot commented May 6, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   11h 4m 1s ⏱️ -7s
 4 107 tests + 1   3 993 ✅  -  1    111 💤  -  1  2 ❌ +2  1 🔥 +1 
51 492 runs  +14  49 205 ✅ +24  2 284 💤  - 13  2 ❌ +2  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 04a713a. ± Comparison against base commit 10b6288.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[False-False]
distributed.tests.test_client ‑ test_compute_partially_forgotten[False]
distributed.tests.test_client ‑ test_compute_partially_forgotten[True]

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented May 6, 2025

I think as a "fix" I will just abort update_graph if any key does not exist. I think the situation where one wants to continue is rather artificial and it is driving complexity a lot

@fjetter fjetter changed the title Partially forgotten dependnecies Partially forgotten dependencies May 6, 2025
@fjetter fjetter force-pushed the transition_error_deps_released branch from 089403c to 26dc7d8 Compare May 7, 2025 08:52
@fjetter fjetter marked this pull request as ready for review May 7, 2025 09:06
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR aims to address issues related to partially forgotten dependencies that cause transition errors during graph updates. Key changes include:

  • Adding a new test (test_compute_partially_forgotten) in distributed/tests/test_client.py to simulate lost dependencies.
  • Modifying the _find_lost_dependencies function in distributed/scheduler.py to remove in‐place mutation of dependency data.
  • Adjusting the lost dependency check in update_graph to report and cancel the graph update when lost dependencies are detected.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
distributed/tests/test_client.py Added a new parametrized test that verifies behavior when a task’s lost dependency triggers a cancellation.
distributed/scheduler.py Updated the lost dependencies logic in _find_lost_dependencies and update_graph to change how task keys are reported and released.
Comments suppressed due to low confidence (1)

distributed/scheduler.py:4957

  • The update_graph branch now uses the full keys set rather than a computed lost_keys set when detecting lost dependencies. Confirm that this change in behavior is intentional and that canceling the graph update based on the entire keys set does not lead to unintended cancellations.
if self._find_lost_dependencies(dsk, keys):

# ['key', 'lost_dep_of_key']
# At the time of writing, it is unclear why the lost_dep_of_key is part of
# keys but this triggers an observed error
keys = key, lost_dep_of_key = list({"foo", "bar"})
Copy link

Copilot AI May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This tuple assignment simultaneously defining 'key', 'lost_dep_of_key' and 'keys' can be confusing. Consider splitting the assignment into two separate statements for better clarity.

Suggested change
keys = key, lost_dep_of_key = list({"foo", "bar"})
keys = list({"foo", "bar"})
key, lost_dep_of_key = keys

Copilot uses AI. Check for mistakes.
@fjetter fjetter merged commit 01ea1eb into dask:main May 7, 2025
29 of 33 checks passed
@fjetter fjetter deleted the transition_error_deps_released branch May 8, 2025 07:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant