Skip to content

Commit f6610e3

Browse files
bpiwowarclaude
andcommitted
test: add concurrent partial locking test
Add test for two processes competing for the same partial, similar to test_token_reschedule. This verifies that partial locking correctly serializes access when multiple processes try to use the same partial directory. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent f25e18d commit f6610e3

File tree

3 files changed

+166
-3
lines changed

3 files changed

+166
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""Script for running partial task in subprocess for concurrent testing"""
2+
3+
if __name__ == "__main__":
4+
import sys
5+
import logging
6+
from pathlib import Path
7+
import time
8+
9+
from experimaestro.scheduler import JobState
10+
from experimaestro.tests.utils import TemporaryExperiment
11+
from experimaestro.tests.task_partial import PartialTask
12+
13+
root = logging.getLogger()
14+
root.setLevel(logging.INFO)
15+
logging.getLogger("xpm").setLevel(logging.DEBUG)
16+
17+
workdir, x, lockingpath, readypath, timepath = sys.argv[1:]
18+
19+
handler = logging.StreamHandler()
20+
bf = logging.Formatter(
21+
f"[XP{x}] [%(levelname)s] %(asctime)s.%(msecs)03d %(name)s "
22+
f"[%(process)d/%(threadName)s]: %(message)s",
23+
datefmt="%H:%M:%S",
24+
)
25+
handler.setFormatter(bf)
26+
root.handlers.clear()
27+
root.addHandler(handler)
28+
29+
with TemporaryExperiment("partial_reschedule%s" % x, workdir=workdir) as xp:
30+
logging.info("Partial reschedule [%s]: starting task in %s", x, workdir)
31+
task = PartialTask.C(path=lockingpath, x=int(x)).submit()
32+
33+
logging.info("Waiting for task (partial with %s) to be scheduled", lockingpath)
34+
while task.job.state == JobState.UNSCHEDULED:
35+
time.sleep(0.01)
36+
37+
# Write so that the test knows we are ready
38+
Path(readypath).write_text("hello")
39+
logging.info("Partial reschedule [%s]: ready", x)
40+
41+
# Wait until the experiment finishes
42+
task.__xpm__.task.job.wait()
43+
logging.info("Partial reschedule [%s]: finished", x)
44+
45+
# Write the timestamp from the task so the test can retrieve them easily
46+
Path(timepath).write_text(Path(task.stdout()).read_text())
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""Task definitions for partial locking tests"""
2+
3+
from pathlib import Path
4+
import time
5+
from experimaestro import Task, Param, Meta, field, PathGenerator, partial, param_group
6+
import logging
7+
8+
logging.basicConfig(level=logging.INFO)
9+
10+
# Define parameter groups
11+
iter_group = param_group("iter")
12+
13+
14+
class PartialTask(Task):
15+
"""Task that uses partial and waits for a file before completing"""
16+
17+
# Define a partial set
18+
checkpoints = partial(exclude_groups=[iter_group])
19+
20+
# Parameter in iter_group - excluded from partial identifier
21+
x: Param[int] = field(groups=[iter_group])
22+
23+
# The path to watch before completing
24+
path: Param[Path]
25+
26+
# Path generated using the partial identifier
27+
checkpoint_path: Meta[Path] = field(
28+
default_factory=PathGenerator("checkpoint", partial=checkpoints)
29+
)
30+
31+
def execute(self):
32+
print(time.time()) # noqa: T201
33+
# Create checkpoint directory
34+
self.checkpoint_path.mkdir(parents=True, exist_ok=True)
35+
# Wait for signal file
36+
while not self.path.is_file():
37+
time.sleep(0.1)
38+
print(time.time()) # noqa: T201

src/experimaestro/tests/test_partial_paths.py

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,7 @@ def test_shared_partial_not_orphaned():
176176
from experimaestro.scheduler.db_state_provider import DbStateProvider
177177

178178
with TemporaryDirectory(prefix="xpm", suffix="partial_shared_cleanup") as workdir:
179-
with TemporaryExperiment(
180-
"partial_shared_cleanup", workdir=workdir, maxwait=30
181-
):
179+
with TemporaryExperiment("partial_shared_cleanup", workdir=workdir, maxwait=30):
182180
# Submit two tasks with same learning_rate (same partial)
183181
task1 = TaskWithPartial.C(max_iter=100, learning_rate=0.1).submit()
184182
task2 = TaskWithPartial.C(max_iter=200, learning_rate=0.1).submit()
@@ -217,3 +215,84 @@ def test_shared_partial_not_orphaned():
217215
assert not checkpoint_path.exists()
218216
finally:
219217
provider.close()
218+
219+
220+
def test_partial_concurrent_processes():
221+
"""Test that two processes competing for the same partial are serialized.
222+
223+
Similar to test_token_reschedule but for partial locking:
224+
- Two tasks with different x (excluded param) share the same partial
225+
- They should run sequentially (one after the other)
226+
"""
227+
import sys
228+
import subprocess
229+
import logging
230+
import time
231+
import pytest
232+
from .utils import TemporaryDirectory, timeout, get_times_frompath
233+
234+
with TemporaryDirectory("partial_reschedule") as workdir:
235+
lockingpath = workdir / "lockingpath"
236+
237+
command = [
238+
sys.executable,
239+
Path(__file__).parent / "partial_reschedule.py",
240+
workdir,
241+
]
242+
243+
ready1 = workdir / "ready.1"
244+
time1 = workdir / "time.1"
245+
p1 = subprocess.Popen(
246+
command + ["1", str(lockingpath), str(ready1), str(time1)]
247+
)
248+
249+
ready2 = workdir / "ready.2"
250+
time2 = workdir / "time.2"
251+
p2 = subprocess.Popen(
252+
command + ["2", str(lockingpath), str(ready2), str(time2)]
253+
)
254+
255+
try:
256+
with timeout(30):
257+
logging.info("Waiting for both experiments to be ready")
258+
# Wait that both processes are ready
259+
while not ready1.is_file():
260+
time.sleep(0.01)
261+
while not ready2.is_file():
262+
time.sleep(0.01)
263+
264+
# Create the locking path to allow tasks to finish
265+
logging.info(
266+
"Both processes are ready: allowing tasks to finish by writing in %s",
267+
lockingpath,
268+
)
269+
lockingpath.write_text("Let's go")
270+
271+
# Waiting for the output
272+
logging.info("Waiting for XP1 to finish (%s)", time1)
273+
while not time1.is_file():
274+
time.sleep(0.01)
275+
logging.info("Experiment 1 finished")
276+
277+
logging.info("Waiting for XP2 to finish")
278+
while not time2.is_file():
279+
time.sleep(0.01)
280+
logging.info("Experiment 2 finished")
281+
282+
time1_val = get_times_frompath(time1)
283+
time2_val = get_times_frompath(time2)
284+
285+
logging.info("%s vs %s", time1_val, time2_val)
286+
# One should have finished before the other started
287+
# (they share the same partial, so only one can run at a time)
288+
assert time1_val > time2_val or time2_val > time1_val
289+
except TimeoutError:
290+
p1.terminate()
291+
p2.terminate()
292+
pytest.fail("Timeout")
293+
294+
except Exception:
295+
logging.warning("Other exception: killing processes (just in case)")
296+
p1.terminate()
297+
p2.terminate()
298+
raise

0 commit comments

Comments
 (0)