@@ -466,25 +466,25 @@ def __init__(
466
466
validate = dask .config .get ("distributed.scheduler.validate" )
467
467
self .validate = validate
468
468
self ._transitions_table = {
469
- ("constrained" , "executing" ): self .transition_constrained_executing__recs ,
470
- ("executing" , "error" ): self .transition_executing_error__recs ,
471
- ("long-running" , "error" ): self .transition_executing_error__recs ,
472
- ("executing" , "long-running" ): self .transition_executing_long_running__recs ,
473
- ("executing" , "memory" ): self .transition_executing_memory__recs ,
474
- ("long-running" , "memory" ): self .transition_executing_memory__recs ,
475
- ("fetch" , "flight" ): self .transition_fetch_flight__recs ,
476
- ("fetch" , "released" ): self .transition_fetch_released__recs ,
477
- ("flight" , "error" ): self .transition_flight_error__recs ,
478
- ("flight" , "fetch" ): self .transition_flight_fetch__recs ,
479
- ("flight" , "memory" ): self .transition_flight_memory__recs ,
480
- ("flight" , "released" ): self .transition_flight_released__recs ,
481
- ("memory" , "released" ): self .transition_memoery_released__recs ,
482
- ("ready" , "error" ): self .transition_generic_error__recs ,
483
- ("ready" , "executing" ): self .transition_ready_executing__recs ,
484
- ("released" , "fetch" ): self .transition_released_fetch__recs ,
485
- ("released" , "waiting" ): self .transition_released_waiting__recs ,
486
- ("waiting" , "constrained" ): self .transition_waiting_constrained__recs ,
487
- ("waiting" , "ready" ): self .transition_waiting_ready__recs ,
469
+ ("constrained" , "executing" ): self .transition_constrained_executing ,
470
+ ("executing" , "error" ): self .transition_executing_error ,
471
+ ("long-running" , "error" ): self .transition_executing_error ,
472
+ ("executing" , "long-running" ): self .transition_executing_long_running ,
473
+ ("executing" , "memory" ): self .transition_executing_memory ,
474
+ ("long-running" , "memory" ): self .transition_executing_memory ,
475
+ ("fetch" , "flight" ): self .transition_fetch_flight ,
476
+ ("fetch" , "released" ): self .transition_fetch_released ,
477
+ ("flight" , "error" ): self .transition_flight_error ,
478
+ ("flight" , "fetch" ): self .transition_flight_fetch ,
479
+ ("flight" , "memory" ): self .transition_flight_memory ,
480
+ ("flight" , "released" ): self .transition_flight_released ,
481
+ ("memory" , "released" ): self .transition_memoery_released ,
482
+ ("ready" , "error" ): self .transition_generic_error ,
483
+ ("ready" , "executing" ): self .transition_ready_executing ,
484
+ ("released" , "fetch" ): self .transition_released_fetch ,
485
+ ("released" , "waiting" ): self .transition_released_waiting ,
486
+ ("waiting" , "constrained" ): self .transition_waiting_constrained ,
487
+ ("waiting" , "ready" ): self .transition_waiting_ready ,
488
488
}
489
489
490
490
self ._transition_counter = 0
@@ -1691,7 +1691,7 @@ def compute_task(
1691
1691
for key , value in nbytes .items ():
1692
1692
self .tasks [key ].nbytes = value
1693
1693
1694
- def transition_released_fetch__recs (self , ts , * , stimulus_id ):
1694
+ def transition_released_fetch (self , ts , * , stimulus_id ):
1695
1695
if self .validate :
1696
1696
assert ts .state == "released"
1697
1697
assert ts .runspec is None
@@ -1702,7 +1702,7 @@ def transition_released_fetch__recs(self, ts, *, stimulus_id):
1702
1702
heapq .heappush (self .data_needed , (ts .priority , ts .key ))
1703
1703
return {}, []
1704
1704
1705
- def transition_released_waiting__recs (self , ts , * , stimulus_id ):
1705
+ def transition_released_waiting (self , ts , * , stimulus_id ):
1706
1706
if self .validate :
1707
1707
assert ts .state == "released"
1708
1708
assert all (d .key in self .tasks for d in ts .dependencies )
@@ -1721,7 +1721,7 @@ def transition_released_waiting__recs(self, ts, *, stimulus_id):
1721
1721
ts .state = "waiting"
1722
1722
return recommendations , []
1723
1723
1724
- def transition_fetch_flight__recs (self , ts , worker , * , stimulus_id ):
1724
+ def transition_fetch_flight (self , ts , worker , * , stimulus_id ):
1725
1725
if self .validate :
1726
1726
assert ts .state == "fetch"
1727
1727
assert ts .who_has
@@ -1732,13 +1732,13 @@ def transition_fetch_flight__recs(self, ts, worker, *, stimulus_id):
1732
1732
self .in_flight_tasks += 1
1733
1733
return {}, []
1734
1734
1735
- def transition_memoery_released__recs (self , ts , * , stimulus_id ):
1735
+ def transition_memoery_released (self , ts , * , stimulus_id ):
1736
1736
ts .state = "released"
1737
1737
ts .protected = False
1738
1738
self .release_key (ts .key )
1739
1739
return {}, []
1740
1740
1741
- def transition_waiting_constrained__recs (self , ts , * , stimulus_id ):
1741
+ def transition_waiting_constrained (self , ts , * , stimulus_id ):
1742
1742
if self .validate :
1743
1743
assert ts .state == "waiting"
1744
1744
assert not ts .waiting_for_data
@@ -1752,7 +1752,7 @@ def transition_waiting_constrained__recs(self, ts, *, stimulus_id):
1752
1752
self .constrained .append (ts .key )
1753
1753
return {}, []
1754
1754
1755
- def transition_waiting_ready__recs (self , ts , * , stimulus_id ):
1755
+ def transition_waiting_ready (self , ts , * , stimulus_id ):
1756
1756
if self .validate :
1757
1757
assert ts .state == "waiting"
1758
1758
assert not ts .waiting_for_data
@@ -1767,22 +1767,20 @@ def transition_waiting_ready__recs(self, ts, *, stimulus_id):
1767
1767
1768
1768
return {}, []
1769
1769
1770
- def transition_generic_error__recs (self , ts , exception , traceback , * , stimulus_id ):
1770
+ def transition_generic_error (self , ts , exception , traceback , * , stimulus_id ):
1771
1771
ts .exception = exception
1772
1772
ts .traceback = traceback
1773
1773
smsgs = [self .get_task_state_for_scheduler (ts )]
1774
1774
ts .state = "error"
1775
1775
return {}, smsgs
1776
1776
1777
- def transition_executing_error__recs (
1778
- self , ts , exception , traceback , * , stimulus_id
1779
- ):
1777
+ def transition_executing_error (self , ts , exception , traceback , * , stimulus_id ):
1780
1778
self .executing_count -= 1
1781
- return self .transition_generic_error__recs (
1779
+ return self .transition_generic_error (
1782
1780
ts , exception , traceback , stimulus_id = stimulus_id
1783
1781
)
1784
1782
1785
- def transition_executing_memory__recs (self , ts , value = no_value , * , stimulus_id ):
1783
+ def transition_executing_memory (self , ts , value = no_value , * , stimulus_id ):
1786
1784
if self .validate :
1787
1785
assert ts .state == "executing" or ts .key in self .long_running
1788
1786
assert not ts .waiting_for_data
@@ -1805,7 +1803,7 @@ def transition_executing_memory__recs(self, ts, value=no_value, *, stimulus_id):
1805
1803
s_msgs .append (self .get_task_state_for_scheduler (ts ))
1806
1804
return recommendations , s_msgs
1807
1805
1808
- def transition_constrained_executing__recs (self , ts , * , stimulus_id ):
1806
+ def transition_constrained_executing (self , ts , * , stimulus_id ):
1809
1807
if self .validate :
1810
1808
assert not ts .waiting_for_data
1811
1809
assert ts .key not in self .data
@@ -1822,7 +1820,7 @@ def transition_constrained_executing__recs(self, ts, *, stimulus_id):
1822
1820
self .loop .add_callback (self .execute , ts .key , stimulus_id = stimulus_id )
1823
1821
return {}, []
1824
1822
1825
- def transition_ready_executing__recs (self , ts , * , stimulus_id ):
1823
+ def transition_ready_executing (self , ts , * , stimulus_id ):
1826
1824
if self .validate :
1827
1825
assert not ts .waiting_for_data
1828
1826
assert ts .key not in self .data
@@ -1837,7 +1835,7 @@ def transition_ready_executing__recs(self, ts, *, stimulus_id):
1837
1835
self .loop .add_callback (self .execute , ts .key , stimulus_id = stimulus_id )
1838
1836
return {}, []
1839
1837
1840
- def transition_flight_fetch__recs (self , ts , * , stimulus_id ):
1838
+ def transition_flight_fetch (self , ts , * , stimulus_id ):
1841
1839
if self .validate :
1842
1840
assert ts .state == "flight"
1843
1841
@@ -1851,14 +1849,14 @@ def transition_flight_fetch__recs(self, ts, *, stimulus_id):
1851
1849
1852
1850
return {}, []
1853
1851
1854
- def transition_flight_error__recs (self , ts , exception , traceback , * , stimulus_id ):
1852
+ def transition_flight_error (self , ts , exception , traceback , * , stimulus_id ):
1855
1853
self .in_flight_tasks -= 1
1856
1854
ts .coming_from = None
1857
- return self .transition_generic_error__recs (
1855
+ return self .transition_generic_error (
1858
1856
ts , exception , traceback , stimulus_id = stimulus_id
1859
1857
)
1860
1858
1861
- def transition_flight_released__recs (self , ts , * , stimulus_id ):
1859
+ def transition_flight_released (self , ts , * , stimulus_id ):
1862
1860
if self .validate :
1863
1861
assert ts .state == "flight"
1864
1862
@@ -1875,7 +1873,7 @@ def transition_flight_released__recs(self, ts, *, stimulus_id):
1875
1873
1876
1874
return recommendations , scheduler_msgs
1877
1875
1878
- def transition_fetch_released__recs (self , ts , * , stimulus_id ):
1876
+ def transition_fetch_released (self , ts , * , stimulus_id ):
1879
1877
if self .validate :
1880
1878
assert ts .state == "fetch"
1881
1879
@@ -1890,9 +1888,7 @@ def transition_fetch_released__recs(self, ts, *, stimulus_id):
1890
1888
1891
1889
return recommendations , scheduler_msgs
1892
1890
1893
- def transition_executing_long_running__recs (
1894
- self , ts , compute_duration , * , stimulus_id
1895
- ):
1891
+ def transition_executing_long_running (self , ts , compute_duration , * , stimulus_id ):
1896
1892
1897
1893
if self .validate :
1898
1894
assert ts .state == "executing"
@@ -1910,7 +1906,7 @@ def transition_executing_long_running__recs(
1910
1906
self .io_loop .add_callback (self .ensure_computing )
1911
1907
return {}, scheduler_msgs
1912
1908
1913
- def transition_flight_memory__recs (self , ts , value , * , stimulus_id ):
1909
+ def transition_flight_memory (self , ts , value , * , stimulus_id ):
1914
1910
if self .validate :
1915
1911
assert ts .state == "flight"
1916
1912
@@ -2157,7 +2153,7 @@ def get_task_state_for_scheduler(self, ts):
2157
2153
"status" : "OK" ,
2158
2154
"key" : ts .key ,
2159
2155
"nbytes" : ts .nbytes ,
2160
- "thread" : self .threads .get (ts .key , list ( self . threads . values ())[ 0 ] ),
2156
+ "thread" : self .threads .get (ts .key ),
2161
2157
"type" : typ_serialized ,
2162
2158
"typename" : typename (typ ),
2163
2159
"metadata" : ts .metadata ,
@@ -2167,7 +2163,7 @@ def get_task_state_for_scheduler(self, ts):
2167
2163
"op" : "task-erred" ,
2168
2164
"status" : "error" ,
2169
2165
"key" : ts .key ,
2170
- "thread" : self .threads .get (ts .key , list ( self . threads . values ())[ 0 ] ),
2166
+ "thread" : self .threads .get (ts .key ),
2171
2167
"exception" : ts .exception ,
2172
2168
"traceback" : ts .traceback ,
2173
2169
}
@@ -2275,8 +2271,12 @@ async def gather_dep(
2275
2271
# Keep namespace clean since this func is long and has many
2276
2272
# dep*, *ts* variables
2277
2273
2278
- # This is awkward, see FIXME below
2279
2274
for dep_key in to_gather :
2275
+ # For diagnostics we want to attach the transfer to a single
2276
+ # task. this task is typically the next to be executed but
2277
+ # since we're fetching tasks for potentially many
2278
+ # dependents, an exact match is not possible.
2279
+ # If there are no dependents, this is a pure replica fetch
2280
2280
dep_ts = cause = self .tasks [dep_key ]
2281
2281
for dependent in dep_ts .dependents :
2282
2282
cause = dependent
0 commit comments