Skip to content

Commit

Permalink
Merge pull request #143 from aurzenligl/master
Browse files Browse the repository at this point in the history
make_numbered_dir() multi-process-safe
  • Loading branch information
RonnyPfannschmidt authored Jul 25, 2017
2 parents 292e61f + bc408c9 commit 30f9978
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 37 deletions.
117 changes: 80 additions & 37 deletions py/_path/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import with_statement

from contextlib import contextmanager
import sys, os, re, atexit, io
import sys, os, re, atexit, io, uuid
import py
from py._path import common
from py._path.common import iswin32, fspath
Expand Down Expand Up @@ -804,7 +804,8 @@ def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3,
""" return unique directory with a number greater than the current
maximum one. The number is assumed to start directly after prefix.
if keep is true directories with a number less than (maxnum-keep)
will be removed.
will be removed. If .lock files are used (lock_timeout non-zero),
algorithm is multi-process safe.
"""
if rootdir is None:
rootdir = cls.get_temproot()
Expand All @@ -819,37 +820,19 @@ def parse_num(path):
except ValueError:
pass

# compute the maximum number currently in use with the
# prefix
lastmax = None
while True:
maxnum = -1
for path in rootdir.listdir():
num = parse_num(path)
if num is not None:
maxnum = max(maxnum, num)

# make the new directory
try:
udir = rootdir.mkdir(prefix + str(maxnum+1))
except py.error.EEXIST:
# race condition: another thread/process created the dir
# in the meantime. Try counting again
if lastmax == maxnum:
raise
lastmax = maxnum
continue
break

# put a .lock file in the new directory that will be removed at
# process exit
if lock_timeout:
lockfile = udir.join('.lock')
def create_lockfile(path):
""" exclusively create lockfile. Throws when failed """
mypid = os.getpid()
lockfile = path.join('.lock')
if hasattr(lockfile, 'mksymlinkto'):
lockfile.mksymlinkto(str(mypid))
else:
lockfile.write(str(mypid))
lockfile.write(str(mypid), 'wx')
return lockfile

def atexit_remove_lockfile(lockfile):
""" ensure lockfile is removed at process exit """
mypid = os.getpid()
def try_remove_lockfile():
# in a fork() situation, only the last process should
# remove the .lock, otherwise the other processes run the
Expand All @@ -864,19 +847,79 @@ def try_remove_lockfile():
pass
atexit.register(try_remove_lockfile)

# compute the maximum number currently in use with the prefix
lastmax = None
while True:
maxnum = -1
for path in rootdir.listdir():
num = parse_num(path)
if num is not None:
maxnum = max(maxnum, num)

# make the new directory
try:
udir = rootdir.mkdir(prefix + str(maxnum+1))
if lock_timeout:
lockfile = create_lockfile(udir)
atexit_remove_lockfile(lockfile)
except (py.error.EEXIST, py.error.ENOENT):
# race condition (1): another thread/process created the dir
# in the meantime - try again
# race condition (2): another thread/process spuriously acquired
# lock treating empty directory as candidate
# for removal - try again
if lastmax == maxnum:
raise
lastmax = maxnum
continue
break

def get_mtime(path):
""" read file modification time """
try:
return path.lstat().mtime
except py.error.Error:
pass

garbage_prefix = prefix + 'garbage-'

def is_garbage(path):
""" check if path denotes directory scheduled for removal """
bn = path.basename
return bn.startswith(garbage_prefix)

# prune old directories
if keep:
udir_time = get_mtime(udir)
if keep and udir_time:
for path in rootdir.listdir():
num = parse_num(path)
if num is not None and num <= (maxnum - keep):
lf = path.join('.lock')
try:
t1 = lf.lstat().mtime
t2 = lockfile.lstat().mtime
if not lock_timeout or abs(t2-t1) < lock_timeout:
continue # skip directories still locked
except py.error.Error:
pass # assume that it means that there is no 'lf'
# try acquiring lock to remove directory as exclusive user
if lock_timeout:
create_lockfile(path)
except (py.error.EEXIST, py.error.ENOENT):
path_time = get_mtime(path)
if not path_time:
# assume directory doesn't exist now
continue
if abs(udir_time - path_time) < lock_timeout:
# assume directory with lockfile exists
# and lock timeout hasn't expired yet
continue

# path dir locked for exlusive use
# and scheduled for removal to avoid another thread/process
# treating it as a new directory or removal candidate
garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4()))
try:
path.rename(garbage_path)
garbage_path.remove(rec=1)
except KeyboardInterrupt:
raise
except: # this might be py.error.Error, WindowsError ...
pass
if is_garbage(path):
try:
path.remove(rec=1)
except KeyboardInterrupt:
Expand Down
23 changes: 23 additions & 0 deletions testing/path/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import os
import sys
import multiprocessing
from py.path import local
import common

Expand Down Expand Up @@ -41,6 +42,21 @@ def __fspath__(self):
return FakeFSPathClass(os.path.join("this", "is", "a", "fake", "path"))


def batch_make_numbered_dirs(rootdir, repeats):
try:
for i in range(repeats):
dir_ = py.path.local.make_numbered_dir(prefix='repro-', rootdir=rootdir)
file_ = dir_.join('foo')
file_.write('%s' % i)
actual = int(file_.read())
assert actual == i, 'int(file_.read()) is %s instead of %s' % (actual, i)
dir_.join('.lock').remove(ignore_errors=True)
return True
except KeyboardInterrupt:
# makes sure that interrupting test session won't hang it
os.exit(2)


class TestLocalPath(common.CommonFSTests):
def test_join_normpath(self, tmpdir):
assert tmpdir.join(".") == tmpdir
Expand Down Expand Up @@ -430,6 +446,13 @@ def notimpl(x, y):
assert x.relto(tmpdir)
assert x.check()

def test_make_numbered_dir_multiprocess_safe(self, tmpdir):
# https://github.com/pytest-dev/py/issues/30
pool = multiprocessing.Pool()
results = [pool.apply_async(batch_make_numbered_dirs, [tmpdir, 100]) for _ in range(20)]
for r in results:
assert r.get() == True

def test_locked_make_numbered_dir(self, tmpdir):
for i in range(10):
numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir,
Expand Down

0 comments on commit 30f9978

Please sign in to comment.