diff --git a/py/_path/local.py b/py/_path/local.py index 8708a7c8..194ebed4 100644 --- a/py/_path/local.py +++ b/py/_path/local.py @@ -4,7 +4,7 @@ from __future__ import with_statement from contextlib import contextmanager -import sys, os, re, atexit, io +import sys, os, re, atexit, io, uuid import py from py._path import common from py._path.common import iswin32, fspath @@ -804,7 +804,8 @@ def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, """ return unique directory with a number greater than the current maximum one. The number is assumed to start directly after prefix. if keep is true directories with a number less than (maxnum-keep) - will be removed. + will be removed. If .lock files are used (lock_timeout non-zero), + algorithm is multi-process safe. """ if rootdir is None: rootdir = cls.get_temproot() @@ -819,37 +820,19 @@ def parse_num(path): except ValueError: pass - # compute the maximum number currently in use with the - # prefix - lastmax = None - while True: - maxnum = -1 - for path in rootdir.listdir(): - num = parse_num(path) - if num is not None: - maxnum = max(maxnum, num) - - # make the new directory - try: - udir = rootdir.mkdir(prefix + str(maxnum+1)) - except py.error.EEXIST: - # race condition: another thread/process created the dir - # in the meantime. Try counting again - if lastmax == maxnum: - raise - lastmax = maxnum - continue - break - - # put a .lock file in the new directory that will be removed at - # process exit - if lock_timeout: - lockfile = udir.join('.lock') + def create_lockfile(path): + """ exclusively create lockfile. Throws when failed """ mypid = os.getpid() + lockfile = path.join('.lock') if hasattr(lockfile, 'mksymlinkto'): lockfile.mksymlinkto(str(mypid)) else: - lockfile.write(str(mypid)) + lockfile.write(str(mypid), 'wx') + return lockfile + + def atexit_remove_lockfile(lockfile): + """ ensure lockfile is removed at process exit """ + mypid = os.getpid() def try_remove_lockfile(): # in a fork() situation, only the last process should # remove the .lock, otherwise the other processes run the @@ -864,19 +847,79 @@ def try_remove_lockfile(): pass atexit.register(try_remove_lockfile) + # compute the maximum number currently in use with the prefix + lastmax = None + while True: + maxnum = -1 + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None: + maxnum = max(maxnum, num) + + # make the new directory + try: + udir = rootdir.mkdir(prefix + str(maxnum+1)) + if lock_timeout: + lockfile = create_lockfile(udir) + atexit_remove_lockfile(lockfile) + except (py.error.EEXIST, py.error.ENOENT): + # race condition (1): another thread/process created the dir + # in the meantime - try again + # race condition (2): another thread/process spuriously acquired + # lock treating empty directory as candidate + # for removal - try again + if lastmax == maxnum: + raise + lastmax = maxnum + continue + break + + def get_mtime(path): + """ read file modification time """ + try: + return path.lstat().mtime + except py.error.Error: + pass + + garbage_prefix = prefix + 'garbage-' + + def is_garbage(path): + """ check if path denotes directory scheduled for removal """ + bn = path.basename + return bn.startswith(garbage_prefix) + # prune old directories - if keep: + udir_time = get_mtime(udir) + if keep and udir_time: for path in rootdir.listdir(): num = parse_num(path) if num is not None and num <= (maxnum - keep): - lf = path.join('.lock') try: - t1 = lf.lstat().mtime - t2 = lockfile.lstat().mtime - if not lock_timeout or abs(t2-t1) < lock_timeout: - continue # skip directories still locked - except py.error.Error: - pass # assume that it means that there is no 'lf' + # try acquiring lock to remove directory as exclusive user + if lock_timeout: + create_lockfile(path) + except (py.error.EEXIST, py.error.ENOENT): + path_time = get_mtime(path) + if not path_time: + # assume directory doesn't exist now + continue + if abs(udir_time - path_time) < lock_timeout: + # assume directory with lockfile exists + # and lock timeout hasn't expired yet + continue + + # path dir locked for exlusive use + # and scheduled for removal to avoid another thread/process + # treating it as a new directory or removal candidate + garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4())) + try: + path.rename(garbage_path) + garbage_path.remove(rec=1) + except KeyboardInterrupt: + raise + except: # this might be py.error.Error, WindowsError ... + pass + if is_garbage(path): try: path.remove(rec=1) except KeyboardInterrupt: diff --git a/testing/path/test_local.py b/testing/path/test_local.py index 196b9e9e..e3977fce 100644 --- a/testing/path/test_local.py +++ b/testing/path/test_local.py @@ -6,6 +6,7 @@ import pytest import os import sys +import multiprocessing from py.path import local import common @@ -41,6 +42,21 @@ def __fspath__(self): return FakeFSPathClass(os.path.join("this", "is", "a", "fake", "path")) +def batch_make_numbered_dirs(rootdir, repeats): + try: + for i in range(repeats): + dir_ = py.path.local.make_numbered_dir(prefix='repro-', rootdir=rootdir) + file_ = dir_.join('foo') + file_.write('%s' % i) + actual = int(file_.read()) + assert actual == i, 'int(file_.read()) is %s instead of %s' % (actual, i) + dir_.join('.lock').remove(ignore_errors=True) + return True + except KeyboardInterrupt: + # makes sure that interrupting test session won't hang it + os.exit(2) + + class TestLocalPath(common.CommonFSTests): def test_join_normpath(self, tmpdir): assert tmpdir.join(".") == tmpdir @@ -430,6 +446,13 @@ def notimpl(x, y): assert x.relto(tmpdir) assert x.check() + def test_make_numbered_dir_multiprocess_safe(self, tmpdir): + # https://github.com/pytest-dev/py/issues/30 + pool = multiprocessing.Pool() + results = [pool.apply_async(batch_make_numbered_dirs, [tmpdir, 100]) for _ in range(20)] + for r in results: + assert r.get() == True + def test_locked_make_numbered_dir(self, tmpdir): for i in range(10): numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir,