Description
I think there is a logic error with bookkeping for TaskGroup.nbytes_in_memory
. There's a discrepancy between how we increment it and decrement it when multiple workers hold the same key.
In transition_memory_released
, we decrement it by nbytes
once for every worker that holds that task:
distributed/distributed/scheduler.py
Lines 2646 to 2649 in 6340b5b
Whereas in
_propagate_forgotten
, we decrement it once by nbytes
if there are any workers holding the task, regardless of how many. This doesn't match with transition_memory_released
:distributed/distributed/scheduler.py
Lines 7339 to 7341 in 646b12b
On the creation side, in TaskState.set_nbytes
, we only increment it by the diff between the last known value and the current value. If the key is being copied to multiple workers, this difference is usually 0:
distributed/distributed/scheduler.py
Lines 1556 to 1562 in 6340b5b
In short, I think TaskGroup.nbytes_in_memory
is incremented once per key, but decremented once per copy of the key.
If nbytes
can be different for different workers, then to do this bookkeeping correctly, I think we'd also need to track TaskState.total_nbytes
(size of all copies of the key), then decrement by that once in transition_memory_released
and _propagate_forgotten
.
Discovered in #4925 (comment). I think #4925 made this more apparent, since it encourages more data replication.
cc @crusaderky since you know more about replicated keys.