Skip to content

Commit c96f330

Browse files
authored
[data] deflake sql + consumption + execution_optimizer + issue detection manager (#57545)
cherrypick #57270 Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
1 parent bf7133e commit c96f330

11 files changed

+1818
-1711
lines changed

python/ray/data/BUILD.bazel

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ py_test_module_list(
5454
],
5555
)
5656

57+
py_test_module_list(
58+
size = "small",
59+
files = glob(["tests/anyscale/test_*.py"]),
60+
tags = [
61+
"exclusive",
62+
"team:data",
63+
],
64+
deps = [
65+
":conftest",
66+
"//:ray_lib",
67+
],
68+
)
69+
5770
py_test(
5871
name = "test_formats",
5972
size = "medium",
@@ -650,6 +663,7 @@ py_test(
650663
size = "medium",
651664
srcs = ["tests/test_json.py"],
652665
tags = [
666+
"data_non_parallel",
653667
"exclusive",
654668
"team:data",
655669
],
@@ -942,9 +956,51 @@ py_test(
942956
)
943957

944958
py_test(
945-
name = "test_execution_optimizer",
959+
name = "test_execution_optimizer_basic",
960+
size = "medium",
961+
srcs = ["tests/test_execution_optimizer_basic.py"],
962+
tags = [
963+
"exclusive",
964+
"team:data",
965+
],
966+
deps = [
967+
":conftest",
968+
"//:ray_lib",
969+
],
970+
)
971+
972+
py_test(
973+
name = "test_execution_optimizer_advanced",
974+
size = "medium",
975+
srcs = ["tests/test_execution_optimizer_advanced.py"],
976+
tags = [
977+
"exclusive",
978+
"team:data",
979+
],
980+
deps = [
981+
":conftest",
982+
"//:ray_lib",
983+
],
984+
)
985+
986+
py_test(
987+
name = "test_execution_optimizer_integrations",
988+
size = "medium",
989+
srcs = ["tests/test_execution_optimizer_integrations.py"],
990+
tags = [
991+
"exclusive",
992+
"team:data",
993+
],
994+
deps = [
995+
":conftest",
996+
"//:ray_lib",
997+
],
998+
)
999+
1000+
py_test(
1001+
name = "test_execution_optimizer_limit_pushdown",
9461002
size = "medium",
947-
srcs = ["tests/test_execution_optimizer.py"],
1003+
srcs = ["tests/test_execution_optimizer_limit_pushdown.py"],
9481004
tags = [
9491005
"exclusive",
9501006
"team:data",

python/ray/data/tests/test_consumption.py

Lines changed: 74 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,73 +1185,81 @@ def sort(r):
11851185
assert r1 == ds.take()
11861186

11871187

1188-
def test_iter_batches_grid(ray_start_regular_shared):
1188+
@pytest.mark.parametrize(
1189+
"block_sizes,batch_size,drop_last",
1190+
[
1191+
# Single block, batch smaller than block, keep partial
1192+
([10], 3, False),
1193+
# Single block, batch smaller than block, drop partial
1194+
([10], 3, True),
1195+
# Single block, exact division
1196+
([10], 5, False),
1197+
# Multiple equal-sized blocks, batch doesn't divide evenly, keep partial
1198+
([5, 5, 5], 7, False),
1199+
# Multiple equal-sized blocks, batch doesn't divide evenly, drop partial
1200+
([5, 5, 5], 7, True),
1201+
# Multiple unequal-sized blocks, keep partial
1202+
([1, 5, 10], 4, False),
1203+
# Multiple unequal-sized blocks, drop partial
1204+
([1, 5, 10], 4, True),
1205+
# Edge case: batch_size = 1
1206+
([5, 3, 7], 1, False),
1207+
# Edge case: batch larger than total rows
1208+
([2, 3, 4], 100, False),
1209+
# Exact division across multiple blocks
1210+
([6, 12, 18], 6, False),
1211+
],
1212+
)
1213+
def test_iter_batches_grid(
1214+
ray_start_regular_shared,
1215+
block_sizes,
1216+
batch_size,
1217+
drop_last,
1218+
):
11891219
# Tests slicing, batch combining, and partial batch dropping logic over
1190-
# a grid of dataset, batching, and dropping configurations.
1191-
# Grid: num_blocks x num_rows_block_1 x ... x num_rows_block_N x
1192-
# batch_size x drop_last
1193-
seed = int(time.time())
1194-
print(f"Seeding RNG for test_iter_batches_grid with: {seed}")
1195-
random.seed(seed)
1196-
max_num_blocks = 20
1197-
max_num_rows_per_block = 20
1198-
num_blocks_samples = 3
1199-
block_sizes_samples = 3
1200-
batch_size_samples = 3
1201-
1202-
for num_blocks in np.random.randint(1, max_num_blocks + 1, size=num_blocks_samples):
1203-
block_sizes_list = [
1204-
np.random.randint(1, max_num_rows_per_block + 1, size=num_blocks)
1205-
for _ in range(block_sizes_samples)
1206-
]
1207-
for block_sizes in block_sizes_list:
1208-
# Create the dataset with the given block sizes.
1209-
dfs = []
1210-
running_size = 0
1211-
for block_size in block_sizes:
1212-
dfs.append(
1213-
pd.DataFrame(
1214-
{"value": list(range(running_size, running_size + block_size))}
1215-
)
1216-
)
1217-
running_size += block_size
1218-
num_rows = running_size
1219-
ds = ray.data.from_blocks(dfs)
1220-
for batch_size in np.random.randint(
1221-
1, num_rows + 1, size=batch_size_samples
1222-
):
1223-
for drop_last in (False, True):
1224-
batches = list(
1225-
ds.iter_batches(
1226-
batch_size=batch_size,
1227-
drop_last=drop_last,
1228-
batch_format="pandas",
1229-
)
1230-
)
1231-
if num_rows % batch_size == 0 or not drop_last:
1232-
# Number of batches should be equal to
1233-
# num_rows / batch_size, rounded up.
1234-
assert len(batches) == math.ceil(num_rows / batch_size)
1235-
# Concatenated batches should equal the DataFrame
1236-
# representation of the entire dataset.
1237-
assert pd.concat(batches, ignore_index=True).equals(
1238-
ds.to_pandas()
1239-
)
1240-
else:
1241-
# Number of batches should be equal to
1242-
# num_rows / batch_size, rounded down.
1243-
assert len(batches) == num_rows // batch_size
1244-
# Concatenated batches should equal the DataFrame
1245-
# representation of the dataset with the partial batch
1246-
# remainder sliced off.
1247-
assert pd.concat(batches, ignore_index=True).equals(
1248-
ds.to_pandas()[: batch_size * (num_rows // batch_size)]
1249-
)
1250-
if num_rows % batch_size == 0 or drop_last:
1251-
assert all(len(batch) == batch_size for batch in batches)
1252-
else:
1253-
assert all(len(batch) == batch_size for batch in batches[:-1])
1254-
assert len(batches[-1]) == num_rows % batch_size
1220+
# specific dataset, batching, and dropping configurations.
1221+
# Create the dataset with the given block sizes.
1222+
dfs = []
1223+
running_size = 0
1224+
for block_size in block_sizes:
1225+
dfs.append(
1226+
pd.DataFrame(
1227+
{"value": list(range(running_size, running_size + block_size))}
1228+
)
1229+
)
1230+
running_size += block_size
1231+
num_rows = running_size
1232+
ds = ray.data.from_blocks(dfs)
1233+
1234+
batches = list(
1235+
ds.iter_batches(
1236+
batch_size=batch_size,
1237+
drop_last=drop_last,
1238+
batch_format="pandas",
1239+
)
1240+
)
1241+
if num_rows % batch_size == 0 or not drop_last:
1242+
# Number of batches should be equal to
1243+
# num_rows / batch_size, rounded up.
1244+
assert len(batches) == math.ceil(num_rows / batch_size)
1245+
# Concatenated batches should equal the DataFrame
1246+
# representation of the entire dataset.
1247+
assert pd.concat(batches, ignore_index=True).equals(ds.to_pandas())
1248+
else:
1249+
# Number of batches should be equal to
1250+
# num_rows / batch_size, rounded down.
1251+
assert len(batches) == num_rows // batch_size
1252+
# Concatenated batches should equal the DataFrame
1253+
# representation of the dataset with the partial batch
1254+
# remainder sliced off.
1255+
assert pd.concat(batches, ignore_index=True).equals(
1256+
ds.to_pandas()[: batch_size * (num_rows // batch_size)]
1257+
)
1258+
if num_rows % batch_size == 0 or drop_last:
1259+
assert all(len(batch) == batch_size for batch in batches)
1260+
else:
1261+
assert all(len(batch) == batch_size for batch in batches[:-1])
1262+
assert len(batches[-1]) == num_rows % batch_size
12551263

12561264

12571265
def test_union(ray_start_regular_shared):

python/ray/data/tests/test_datasink.py

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,54 +28,6 @@ def test_write_datasink(ray_start_regular_shared):
2828
assert ray.get(output.data_sink.get_rows_written.remote()) == 10
2929

3030

31-
class NodeLoggerOutputDatasink(Datasink[None]):
32-
"""A writable datasource that logs node IDs of write tasks, for testing."""
33-
34-
def __init__(self, node_id: str):
35-
36-
self.num_ok = 0
37-
self.num_failed = 0
38-
self.node_id = node_id
39-
self.num_rows_written = 0
40-
41-
def write(
42-
self,
43-
blocks: Iterable[Block],
44-
ctx: TaskContext,
45-
) -> None:
46-
47-
node_id = ray.get_runtime_context().get_node_id()
48-
assert node_id == self.node_id
49-
50-
def on_write_complete(self, write_result: WriteResult[None]):
51-
self.num_ok += 1
52-
self.num_rows_written += write_result.num_rows
53-
54-
def on_write_failed(self, error: Exception) -> None:
55-
self.num_failed += 1
56-
57-
58-
def test_write_datasink_ray_remote_args(ray_start_cluster):
59-
ray.shutdown()
60-
cluster = ray_start_cluster
61-
cluster.add_node(
62-
resources={"foo": 100},
63-
num_cpus=1,
64-
)
65-
bar_worker = cluster.add_node(resources={"bar": 100}, num_cpus=1)
66-
bar_node_id = bar_worker.node_id
67-
68-
ray.init(cluster.address)
69-
70-
output = NodeLoggerOutputDatasink(bar_node_id)
71-
ds = ray.data.range(100, override_num_blocks=10)
72-
# Pin write tasks to node with "bar" resource.
73-
ds.write_datasink(output, ray_remote_args={"resources": {"bar": 1}})
74-
assert output.num_ok == 1
75-
assert output.num_failed == 0
76-
assert output.num_rows_written == 100
77-
78-
7931
@pytest.mark.parametrize("min_rows_per_write", [25, 50])
8032
def test_min_rows_per_write(tmp_path, ray_start_regular_shared, min_rows_per_write):
8133
class MockDatasink(Datasink[None]):
@@ -122,8 +74,8 @@ def on_write_complete(self, write_result: WriteResult[CustomWriteResult]):
12274
self.num_rows = write_result.num_rows
12375
self.size_bytes = write_result.size_bytes
12476

125-
num_items = 100
126-
size_bytes_per_row = 1000
77+
num_items = 10
78+
size_bytes_per_row = 500
12779

12880
def map_fn(row):
12981
row["data"] = numpy.zeros(size_bytes_per_row, dtype=numpy.int8)
@@ -139,6 +91,54 @@ def map_fn(row):
13991
assert datasink.size_bytes == pytest.approx(num_items * size_bytes_per_row, rel=0.1)
14092

14193

94+
class NodeLoggerOutputDatasink(Datasink[None]):
95+
"""A writable datasource that logs node IDs of write tasks, for testing."""
96+
97+
def __init__(self, node_id: str):
98+
99+
self.num_ok = 0
100+
self.num_failed = 0
101+
self.node_id = node_id
102+
self.num_rows_written = 0
103+
104+
def write(
105+
self,
106+
blocks: Iterable[Block],
107+
ctx: TaskContext,
108+
) -> None:
109+
110+
node_id = ray.get_runtime_context().get_node_id()
111+
assert node_id == self.node_id
112+
113+
def on_write_complete(self, write_result: WriteResult[None]):
114+
self.num_ok += 1
115+
self.num_rows_written += write_result.num_rows
116+
117+
def on_write_failed(self, error: Exception) -> None:
118+
self.num_failed += 1
119+
120+
121+
def test_write_datasink_ray_remote_args(ray_start_cluster):
122+
ray.shutdown()
123+
cluster = ray_start_cluster
124+
cluster.add_node(
125+
resources={"foo": 100},
126+
num_cpus=1,
127+
)
128+
bar_worker = cluster.add_node(resources={"bar": 100}, num_cpus=1)
129+
bar_node_id = bar_worker.node_id
130+
131+
ray.init(cluster.address)
132+
133+
output = NodeLoggerOutputDatasink(bar_node_id)
134+
ds = ray.data.range(100, override_num_blocks=10)
135+
# Pin write tasks to node with "bar" resource.
136+
ds.write_datasink(output, ray_remote_args={"resources": {"bar": 1}})
137+
assert output.num_ok == 1
138+
assert output.num_failed == 0
139+
assert output.num_rows_written == 100
140+
141+
142142
if __name__ == "__main__":
143143
import sys
144144

0 commit comments

Comments
 (0)