Skip to content

Commit 1818788

Browse files
authored
Revert "Limit incoming data transfers by amount of data" (#6994)
This reverts commit 6da758b.
1 parent 7f83c7b commit 1818788

File tree

5 files changed

+19
-268
lines changed

5 files changed

+19
-268
lines changed

distributed/distributed-schema.yaml

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -508,21 +508,13 @@ properties:
508508
donate data and only nodes below 45% will receive them. This helps
509509
avoid data from bouncing around the cluster repeatedly.
510510
511-
transfer:
512-
oneOf:
513-
- {type: number, minimum: 0, maximum: 1}
514-
- {enum: [false]}
515-
description: >-
516-
When the total size of incoming data transfers gets above this amount,
517-
we start throttling incoming data transfers
518-
519511
target:
520512
oneOf:
521513
- {type: number, minimum: 0, maximum: 1}
522514
- {enum: [false]}
523515
description: >-
524516
When the process memory (as observed by the operating system) gets
525-
above this amount, we start spilling the dask keys holding the oldest
517+
above this amount we start spilling the dask keys holding the largest
526518
chunks of data to disk
527519
528520
spill:
@@ -531,18 +523,15 @@ properties:
531523
- {enum: [false]}
532524
description: >-
533525
When the process memory (as observed by the operating system) gets
534-
above this amount, we spill data to disk, starting from the dask keys
535-
holding the oldest chunks of data, until the process memory falls below
536-
the target threshold.
526+
above this amount we spill all data to disk.
537527
538528
pause:
539529
oneOf:
540530
- {type: number, minimum: 0, maximum: 1}
541531
- {enum: [false]}
542532
description: >-
543533
When the process memory (as observed by the operating system) gets
544-
above this amount, we no longer start new tasks or fetch new
545-
data on the worker.
534+
above this amount we no longer start new tasks on this worker.
546535
547536
terminate:
548537
oneOf:

distributed/distributed.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ distributed:
143143

144144
# Fractions of worker process memory at which we take action to avoid memory
145145
# blowup. Set any of the values to False to turn off the behavior entirely.
146-
# All fractions are relative to each worker's memory_limit.
147-
transfer: 0.10 # fractional size of incoming data transfers where we start
148-
# throttling incoming data transfers
149146
target: 0.60 # fraction of managed memory where we start spilling to disk
150147
spill: 0.70 # fraction of process memory where we start spilling to disk
151148
pause: 0.80 # fraction of process memory at which we pause worker threads

distributed/tests/test_worker_state_machine.py

Lines changed: 0 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import asyncio
44
import gc
55
import pickle
6-
from collections import defaultdict
76
from collections.abc import Iterator
87

98
import pytest
@@ -1352,189 +1351,3 @@ def test_transfer_incoming_metrics(ws):
13521351
assert ws.transfer_incoming_bytes == 0
13531352
assert ws.transfer_incoming_count == 0
13541353
assert ws.transfer_incoming_count_total == 4
1355-
1356-
1357-
def test_throttling_does_not_affect_first_transfer(ws):
1358-
ws.transfer_incoming_count_limit = 100
1359-
ws.transfer_incoming_bytes_limit = 100
1360-
ws.transfer_incoming_bytes_throttle_threshold = 1
1361-
ws2 = "127.0.0.1:2"
1362-
ws.handle_stimulus(
1363-
ComputeTaskEvent.dummy(
1364-
"c",
1365-
who_has={"a": [ws2]},
1366-
nbytes={"a": 200},
1367-
stimulus_id="s1",
1368-
)
1369-
)
1370-
assert ws.tasks["a"].state == "flight"
1371-
1372-
1373-
def test_throttle_incoming_transfers_on_count_limit(ws):
1374-
ws.transfer_incoming_count_limit = 1
1375-
ws.transfer_incoming_bytes_limit = 100_000
1376-
ws.transfer_incoming_bytes_throttle_threshold = 1
1377-
ws2 = "127.0.0.1:2"
1378-
ws3 = "127.0.0.1:3"
1379-
who_has = {"a": [ws2], "b": [ws3]}
1380-
ws.handle_stimulus(
1381-
ComputeTaskEvent.dummy(
1382-
"c",
1383-
who_has=who_has,
1384-
nbytes={"a": 100, "b": 100},
1385-
stimulus_id="s1",
1386-
)
1387-
)
1388-
tasks_by_state = defaultdict(list)
1389-
for ts in ws.tasks.values():
1390-
tasks_by_state[ts.state].append(ts)
1391-
assert len(tasks_by_state["flight"]) == 1
1392-
assert len(tasks_by_state["fetch"]) == 1
1393-
assert ws.transfer_incoming_bytes == 100
1394-
1395-
in_flight_task = tasks_by_state["flight"][0]
1396-
ws.handle_stimulus(
1397-
GatherDepSuccessEvent(
1398-
worker=who_has[in_flight_task.key][0],
1399-
data={in_flight_task.key: 123},
1400-
total_nbytes=100,
1401-
stimulus_id="s2",
1402-
)
1403-
)
1404-
assert tasks_by_state["flight"][0].state == "memory"
1405-
assert tasks_by_state["fetch"][0].state == "flight"
1406-
assert ws.transfer_incoming_bytes == 100
1407-
1408-
1409-
def test_throttling_incoming_transfer_on_transfer_bytes_same_worker(ws):
1410-
ws.transfer_incoming_count_limit = 100
1411-
ws.transfer_incoming_bytes_limit = 250
1412-
ws.transfer_incoming_bytes_throttle_threshold = 1
1413-
ws2 = "127.0.0.1:2"
1414-
ws.handle_stimulus(
1415-
ComputeTaskEvent.dummy(
1416-
"d",
1417-
who_has={"a": [ws2], "b": [ws2], "c": [ws2]},
1418-
nbytes={"a": 100, "b": 100, "c": 100},
1419-
stimulus_id="s1",
1420-
)
1421-
)
1422-
tasks_by_state = defaultdict(list)
1423-
for ts in ws.tasks.values():
1424-
tasks_by_state[ts.state].append(ts)
1425-
assert ws.transfer_incoming_bytes == 200
1426-
assert len(tasks_by_state["flight"]) == 2
1427-
assert len(tasks_by_state["fetch"]) == 1
1428-
1429-
ws.handle_stimulus(
1430-
GatherDepSuccessEvent(
1431-
worker=ws2,
1432-
data={ts.key: 123 for ts in tasks_by_state["flight"]},
1433-
total_nbytes=200,
1434-
stimulus_id="s2",
1435-
)
1436-
)
1437-
assert all(ts.state == "memory" for ts in tasks_by_state["flight"])
1438-
assert all(ts.state == "flight" for ts in tasks_by_state["fetch"])
1439-
1440-
1441-
def test_throttling_incoming_transfer_on_transfer_bytes_different_workers(ws):
1442-
ws.transfer_incoming_count_limit = 100
1443-
ws.transfer_incoming_bytes_limit = 150
1444-
ws.transfer_incoming_bytes_throttle_threshold = 1
1445-
ws2 = "127.0.0.1:2"
1446-
ws3 = "127.0.0.1:3"
1447-
who_has = {"a": [ws2], "b": [ws3]}
1448-
ws.handle_stimulus(
1449-
ComputeTaskEvent.dummy(
1450-
"c",
1451-
who_has=who_has,
1452-
nbytes={"a": 100, "b": 100},
1453-
stimulus_id="s1",
1454-
)
1455-
)
1456-
tasks_by_state = defaultdict(list)
1457-
for ts in ws.tasks.values():
1458-
tasks_by_state[ts.state].append(ts)
1459-
assert ws.transfer_incoming_bytes == 100
1460-
assert len(tasks_by_state["flight"]) == 1
1461-
assert len(tasks_by_state["fetch"]) == 1
1462-
1463-
in_flight_task = tasks_by_state["flight"][0]
1464-
ws.handle_stimulus(
1465-
GatherDepSuccessEvent(
1466-
worker=who_has[in_flight_task.key][0],
1467-
data={in_flight_task.key: 123},
1468-
total_nbytes=100,
1469-
stimulus_id="s2",
1470-
)
1471-
)
1472-
assert tasks_by_state["flight"][0].state == "memory"
1473-
assert tasks_by_state["fetch"][0].state == "flight"
1474-
1475-
1476-
def test_do_not_throttle_connections_while_below_threshold(ws):
1477-
ws.transfer_incoming_count_limit = 1
1478-
ws.transfer_incoming_bytes_limit = 200
1479-
ws.transfer_incoming_bytes_throttle_threshold = 50
1480-
ws2 = "127.0.0.1:2"
1481-
ws3 = "127.0.0.1:3"
1482-
ws4 = "127.0.0.1:4"
1483-
ws.handle_stimulus(
1484-
ComputeTaskEvent.dummy(
1485-
"b",
1486-
who_has={"a": [ws2]},
1487-
nbytes={"a": 1},
1488-
stimulus_id="s1",
1489-
)
1490-
)
1491-
assert ws.tasks["a"].state == "flight"
1492-
1493-
ws.handle_stimulus(
1494-
ComputeTaskEvent.dummy(
1495-
"d",
1496-
who_has={"c": [ws3]},
1497-
nbytes={"c": 1},
1498-
stimulus_id="s2",
1499-
)
1500-
)
1501-
assert ws.tasks["c"].state == "flight"
1502-
1503-
ws.handle_stimulus(
1504-
ComputeTaskEvent.dummy(
1505-
"f",
1506-
who_has={"e": [ws4]},
1507-
nbytes={"e": 100},
1508-
stimulus_id="s3",
1509-
)
1510-
)
1511-
assert ws.tasks["e"].state == "flight"
1512-
assert ws.transfer_incoming_bytes == 102
1513-
1514-
1515-
def test_throttle_on_transfer_bytes_regardless_of_threshold(ws):
1516-
ws.transfer_incoming_count_limit = 1
1517-
ws.transfer_incoming_bytes_limit = 100
1518-
ws.transfer_incoming_bytes_throttle_threshold = 50
1519-
ws2 = "127.0.0.1:2"
1520-
ws3 = "127.0.0.1:3"
1521-
ws.handle_stimulus(
1522-
ComputeTaskEvent.dummy(
1523-
"b",
1524-
who_has={"a": [ws2]},
1525-
nbytes={"a": 1},
1526-
stimulus_id="s1",
1527-
)
1528-
)
1529-
assert ws.tasks["a"].state == "flight"
1530-
1531-
ws.handle_stimulus(
1532-
ComputeTaskEvent.dummy(
1533-
"d",
1534-
who_has={"c": [ws3]},
1535-
nbytes={"c": 100},
1536-
stimulus_id="s2",
1537-
)
1538-
)
1539-
assert ws.tasks["c"].state == "fetch"
1540-
assert ws.transfer_incoming_bytes == 1

distributed/worker.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -745,18 +745,6 @@ def __init__(
745745
memory_spill_fraction=memory_spill_fraction,
746746
memory_pause_fraction=memory_pause_fraction,
747747
)
748-
749-
transfer_incoming_bytes_limit = None
750-
transfer_incoming_bytes_fraction = dask.config.get(
751-
"distributed.worker.memory.transfer"
752-
)
753-
if (
754-
self.memory_manager.memory_limit is not None
755-
and transfer_incoming_bytes_fraction is not False
756-
):
757-
transfer_incoming_bytes_limit = int(
758-
self.memory_manager.memory_limit * transfer_incoming_bytes_fraction
759-
)
760748
state = WorkerState(
761749
nthreads=nthreads,
762750
data=self.memory_manager.data,
@@ -766,7 +754,6 @@ def __init__(
766754
transfer_incoming_count_limit=transfer_incoming_count_limit,
767755
validate=validate,
768756
transition_counter_max=transition_counter_max,
769-
transfer_incoming_bytes_limit=transfer_incoming_bytes_limit,
770757
)
771758
BaseWorker.__init__(self, state)
772759

0 commit comments

Comments
 (0)