Skip to content

Commit adecbcd

Browse files
committed
pytest_plugin(feat): Add atomic locking for pytest-xdist compatibility
why: Sporadic slowness in fixtures due to race conditions when multiple pytest-xdist workers compete to initialize shared cache. Workers would see marker file missing, all start expensive initialization, and potentially corrupt each other's work. what: - Add filelock-inspired atomic locking with os.O_CREAT|os.O_EXCL pattern - Fix TTL boundary race by using 23.5h instead of 24h, atomic file ops - Remove aggressive cache cleanup that deleted other workers' caches - Add _atomic_repo_init() helper with stale lock detection (5min timeout) - Apply atomic init to git_remote_repo, hg_remote_repo, svn_remote_repo Based on patterns from: pytest (_pytest/pathlib.py), filelock (SoftFileLock)
1 parent 0c3d1ca commit adecbcd

File tree

1 file changed

+176
-70
lines changed

1 file changed

+176
-70
lines changed

src/libvcs/pytest_plugin.py

Lines changed: 176 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import asyncio
6+
import contextlib
67
import dataclasses
78
import functools
89
import getpass
@@ -148,24 +149,147 @@ def get_vcs_version(cmd: list[str]) -> str:
148149
return "not-installed"
149150

150151

152+
# Stale lock timeout (5 minutes - covers slow hg operations)
153+
_LOCK_TIMEOUT = 5 * 60
154+
155+
156+
def _acquire_lock(lock_path: pathlib.Path) -> int | None:
157+
"""Atomically acquire lock file. Returns fd if acquired, None otherwise.
158+
159+
Uses filelock's SoftFileLock pattern: os.O_CREAT | os.O_EXCL for atomicity.
160+
"""
161+
try:
162+
fd = os.open(
163+
str(lock_path),
164+
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
165+
0o644,
166+
)
167+
except FileExistsError:
168+
return None
169+
else:
170+
# Write PID for debugging stale locks
171+
os.write(fd, str(os.getpid()).encode())
172+
return fd
173+
174+
175+
def _release_lock(lock_path: pathlib.Path, fd: int) -> None:
176+
"""Release lock file."""
177+
os.close(fd)
178+
with contextlib.suppress(OSError):
179+
lock_path.unlink()
180+
181+
182+
def _is_lock_stale(lock_path: pathlib.Path) -> bool:
183+
"""Check if lock is stale (older than timeout)."""
184+
try:
185+
mtime = lock_path.stat().st_mtime
186+
return time.time() - mtime > _LOCK_TIMEOUT
187+
except OSError:
188+
return True
189+
190+
191+
def _atomic_repo_init(
192+
repo_path: pathlib.Path,
193+
init_fn: t.Callable[[], None],
194+
marker_name: str = ".libvcs_initialized",
195+
timeout: float = 60.0,
196+
poll_interval: float = 0.05,
197+
) -> bool:
198+
"""Atomically initialize a repository with file-based lock coordination.
199+
200+
Uses filelock-inspired pattern for pytest-xdist worker coordination.
201+
Two-file approach: .lock (temporary) vs marker (permanent).
202+
203+
Parameters
204+
----------
205+
repo_path : pathlib.Path
206+
Path to the repository directory to initialize
207+
init_fn : Callable[[], None]
208+
Function to call to perform initialization (creates repo_path)
209+
marker_name : str
210+
Name of the marker file indicating successful completion
211+
timeout : float
212+
Maximum seconds to wait for another process to complete
213+
poll_interval : float
214+
Seconds between polling attempts (default 50ms like filelock)
215+
216+
Returns
217+
-------
218+
bool
219+
True if this process performed initialization, False if waited for another
220+
"""
221+
marker = repo_path / marker_name
222+
lock_path = repo_path.parent / f".{repo_path.name}.lock"
223+
224+
# Fast path: already initialized
225+
if marker.exists():
226+
return False
227+
228+
# Ensure parent directory exists for lock file
229+
lock_path.parent.mkdir(parents=True, exist_ok=True)
230+
231+
start_time = time.perf_counter()
232+
233+
while True:
234+
# Try to acquire lock
235+
fd = _acquire_lock(lock_path)
236+
237+
if fd is not None:
238+
# We got the lock
239+
try:
240+
# Double-check marker (another process may have finished)
241+
if marker.exists():
242+
return False
243+
# Clean partial state and initialize
244+
if repo_path.exists():
245+
shutil.rmtree(repo_path)
246+
init_fn()
247+
marker.touch()
248+
return True
249+
finally:
250+
_release_lock(lock_path, fd)
251+
252+
# Lock held by another process - check if done or stale
253+
if marker.exists():
254+
return False
255+
256+
if _is_lock_stale(lock_path):
257+
with contextlib.suppress(OSError):
258+
lock_path.unlink()
259+
continue # Retry immediately
260+
261+
# Timeout check
262+
if time.perf_counter() - start_time >= timeout:
263+
msg = f"Timeout waiting for {repo_path} initialization"
264+
raise TimeoutError(msg)
265+
266+
time.sleep(poll_interval)
267+
268+
151269
def get_cache_key() -> str:
152270
"""Generate cache key from VCS versions and libvcs version.
153271
154272
The cache is invalidated when any VCS tool or libvcs version changes.
155-
Results are cached to disk with a 24-hour TTL to avoid slow `hg --version`
273+
Results are cached to disk with a ~23.5-hour TTL to avoid slow `hg --version`
156274
calls (which take ~100ms due to Python startup overhead).
275+
276+
Uses atomic file operations to prevent race conditions with parallel workers.
157277
"""
158278
base_dir = get_xdg_cache_dir()
159279
key_file = base_dir / ".cache_key"
160280

161-
# Return cached key if exists and is recent (within 24 hours)
162-
if key_file.exists():
163-
try:
164-
stat = key_file.stat()
165-
if time.time() - stat.st_mtime < 86400: # 24 hours
166-
return key_file.read_text().strip()
167-
except OSError:
168-
pass # File was deleted or inaccessible, regenerate
281+
# Try to return cached key (atomic read with full error handling)
282+
# No exists() check - let stat() fail naturally to avoid TOCTOU race
283+
try:
284+
stat = key_file.stat()
285+
# Use 23.5 hours (not 24) to avoid exact boundary race conditions
286+
if time.time() - stat.st_mtime < 84600:
287+
cached_key = key_file.read_text().strip()
288+
# Validate format before using (guards against corruption)
289+
if len(cached_key) == 12:
290+
return cached_key
291+
except (OSError, ValueError):
292+
pass # File missing, stale, corrupt, or race condition - regenerate
169293

170294
# Compute fresh key from VCS versions
171295
versions = [
@@ -177,10 +301,12 @@ def get_cache_key() -> str:
177301
version_str = "|".join(versions)
178302
cache_key = hashlib.sha256(version_str.encode()).hexdigest()[:12]
179303

180-
# Cache to disk for future runs
304+
# Atomic write: write to temp file, then rename (atomic on POSIX)
181305
try:
182306
base_dir.mkdir(parents=True, exist_ok=True)
183-
key_file.write_text(cache_key)
307+
tmp_file = base_dir / f".cache_key.{os.getpid()}.tmp"
308+
tmp_file.write_text(cache_key)
309+
tmp_file.rename(key_file)
184310
except OSError:
185311
pass # Cache write failed, continue without caching
186312

@@ -266,11 +392,11 @@ def libvcs_persistent_cache(request: pytest.FixtureRequest) -> pathlib.Path:
266392
if request.config.getoption("--libvcs-clear-cache") and base_dir.exists():
267393
shutil.rmtree(base_dir)
268394

269-
# Clean old cache versions (different keys)
270-
if base_dir.exists():
271-
for old_cache in base_dir.iterdir():
272-
if old_cache.is_dir() and old_cache.name != cache_key:
273-
shutil.rmtree(old_cache)
395+
# NOTE: Automatic cleanup of old cache versions removed to prevent race
396+
# conditions with pytest-xdist parallel workers. Old cache versions may
397+
# accumulate but won't cause issues. Users can clean manually:
398+
# rm -rf ~/.cache/libvcs-test/*
399+
# Or use: --libvcs-clear-cache
274400

275401
# Create cache directory
276402
cache_dir.mkdir(parents=True, exist_ok=True)
@@ -664,28 +790,22 @@ def git_remote_repo(
664790
"""Return cached Git remote repo with an initial commit.
665791
666792
Uses persistent XDG cache - repo persists across test sessions.
667-
Uses a marker file to ensure the commit was successfully created.
793+
Uses atomic file locking for pytest-xdist worker coordination.
668794
"""
669795
repo_path = remote_repos_path / "git_remote_repo"
670-
marker = repo_path / ".libvcs_initialized"
671796

672-
# Return cached repo if fully initialized (has marker file)
673-
if repo_path.exists() and marker.exists():
797+
# Fast path: already initialized
798+
if (repo_path / ".libvcs_initialized").exists():
674799
return repo_path
675800

676-
# Create from empty template
677-
if repo_path.exists():
678-
shutil.rmtree(repo_path)
679-
shutil.copytree(empty_git_repo, repo_path)
680-
681-
# Add initial commit
682-
git_remote_repo_single_commit_post_init(
683-
remote_repo_path=repo_path,
684-
env=git_commit_envvars,
685-
)
801+
def do_init() -> None:
802+
shutil.copytree(empty_git_repo, repo_path)
803+
git_remote_repo_single_commit_post_init(
804+
remote_repo_path=repo_path,
805+
env=git_commit_envvars,
806+
)
686807

687-
# Mark as fully initialized
688-
marker.touch()
808+
_atomic_repo_init(repo_path, do_init)
689809
return repo_path
690810

691811

@@ -798,22 +918,18 @@ def svn_remote_repo(
798918
"""Return cached SVN remote repo.
799919
800920
Uses persistent XDG cache - repo persists across test sessions.
801-
Uses a marker file to ensure initialization was successful.
921+
Uses atomic file locking for pytest-xdist worker coordination.
802922
"""
803923
repo_path = remote_repos_path / "svn_remote_repo"
804-
marker = repo_path / ".libvcs_initialized"
805924

806-
# Return cached repo if fully initialized (has marker file)
807-
if repo_path.exists() and marker.exists():
925+
# Fast path: already initialized
926+
if (repo_path / ".libvcs_initialized").exists():
808927
return repo_path
809928

810-
# Create from empty template
811-
if repo_path.exists():
812-
shutil.rmtree(repo_path)
813-
shutil.copytree(empty_svn_repo, repo_path)
929+
def do_init() -> None:
930+
shutil.copytree(empty_svn_repo, repo_path)
814931

815-
# Mark as fully initialized
816-
marker.touch()
932+
_atomic_repo_init(repo_path, do_init)
817933
return repo_path
818934

819935

@@ -826,24 +942,19 @@ def svn_remote_repo_with_files(
826942
"""Return cached SVN remote repo with files committed.
827943
828944
Uses persistent XDG cache - repo persists across test sessions.
829-
Uses a marker file to ensure the commit was successfully created.
945+
Uses atomic file locking for pytest-xdist worker coordination.
830946
"""
831947
repo_path = remote_repos_path / "svn_remote_repo_with_files"
832-
marker = repo_path / ".libvcs_initialized"
833948

834-
# Return cached repo if fully initialized (has marker file)
835-
if repo_path.exists() and marker.exists():
949+
# Fast path: already initialized
950+
if (repo_path / ".libvcs_initialized").exists():
836951
return repo_path
837952

838-
# Create from base svn_remote_repo
839-
if repo_path.exists():
840-
shutil.rmtree(repo_path)
841-
shutil.copytree(svn_remote_repo, repo_path)
953+
def do_init() -> None:
954+
shutil.copytree(svn_remote_repo, repo_path)
955+
svn_remote_repo_single_commit_post_init(remote_repo_path=repo_path)
842956

843-
svn_remote_repo_single_commit_post_init(remote_repo_path=repo_path)
844-
845-
# Mark as fully initialized
846-
marker.touch()
957+
_atomic_repo_init(repo_path, do_init)
847958
return repo_path
848959

849960

@@ -946,28 +1057,23 @@ def hg_remote_repo(
9461057
"""Return cached Mercurial remote repo with an initial commit.
9471058
9481059
Uses persistent XDG cache - repo persists across test sessions.
949-
Uses a marker file to ensure the commit was successfully created.
1060+
Uses atomic file locking for pytest-xdist worker coordination.
9501061
"""
9511062
repo_path = remote_repos_path / "hg_remote_repo"
952-
marker = repo_path / ".libvcs_initialized"
9531063

954-
# Return cached repo if fully initialized (has marker file)
955-
if repo_path.exists() and marker.exists():
1064+
# Fast path: already initialized
1065+
if (repo_path / ".libvcs_initialized").exists():
9561066
return repo_path
9571067

958-
# Create from empty template
959-
if repo_path.exists():
960-
shutil.rmtree(repo_path)
961-
shutil.copytree(empty_hg_repo, repo_path)
962-
963-
# Add initial commit (slow: ~288ms due to hg add + commit)
964-
hg_remote_repo_single_commit_post_init(
965-
remote_repo_path=repo_path,
966-
env={"HGRCPATH": str(hgconfig)},
967-
)
1068+
def do_init() -> None:
1069+
shutil.copytree(empty_hg_repo, repo_path)
1070+
# Add initial commit (slow: ~288ms due to hg add + commit)
1071+
hg_remote_repo_single_commit_post_init(
1072+
remote_repo_path=repo_path,
1073+
env={"HGRCPATH": str(hgconfig)},
1074+
)
9681075

969-
# Mark as fully initialized
970-
marker.touch()
1076+
_atomic_repo_init(repo_path, do_init)
9711077
return repo_path
9721078

9731079

0 commit comments

Comments
 (0)