Skip to content

Commit

Permalink
rdir: threads support
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Kasarherou committed Feb 2, 2017
1 parent 1809cef commit f478aa0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 81 deletions.
1 change: 1 addition & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ In addition, there some additional dependencies at runtime:
* libapache2-mod-wsgi (as named on Ubuntu), the WSGI module pour apache2
* python-cliff
* python-pyeclib
* python-futures

The account service will require an up and running backend:
* redis
Expand Down
1 change: 1 addition & 0 deletions all-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ simplejson>=2.0.9
plyvel>=0.9
PyYAML>=3.10
PyECLib>=1.2.0
futures>=3.0.5
7 changes: 4 additions & 3 deletions oio/common/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from gunicorn.app.base import BaseApplication
from gunicorn.glogging import Logger

from oio.common.utils import get_logger, read_conf
from oio.common.utils import get_logger, read_conf, CPU_COUNT


class Application(BaseApplication):
Expand All @@ -19,13 +19,14 @@ def load_config(self):
self.conf.get('bind_port', '8000'))
self.cfg.set('bind', bind)
self.cfg.set('backlog', self.conf.get('backlog', 2048))
self.cfg.set('workers', self.conf.get('workers', 2))
self.cfg.set('workers', self.conf.get('workers', 1))
self.cfg.set('threads', self.conf.get('threads', CPU_COUNT))
self.cfg.set('worker_class', self.conf.get('worker_class', 'eventlet'))
self.cfg.set('worker_connections', self.conf.get(
'worker_connections', 1000))
self.cfg.set('syslog_prefix', self.conf.get('syslog_prefix', ''))
self.cfg.set('syslog_addr', self.conf.get('log_address', '/dev/log'))
self.cfg.set('accesslog', '-')
self.cfg.set('accesslog', None)
self.cfg.set('access_log_format', self.conf.get('access_log_format',
self.access_log_fmt))
if self.logger_class:
Expand Down
62 changes: 25 additions & 37 deletions oio/rdir/server_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def _wrapped(self, volume_id, *args, **kwargs):
except plyvel.Error as e:
if 'does not exist' in e.message:
msg = "Volume '%s' does not exist." % volume_id
raise NoSuchDb(msg)
raise NoSuchDb(msg)
raise e
return _wrapped

# FIXME this class is not thread-safe (see _get_db, push, lock) but it
Expand All @@ -51,7 +52,7 @@ def __init__(self, conf):
self.db_path = conf.get('db_path')
self.dbs = {}
self.logger = get_logger(conf)
self.lock = threading.RLock()
self.dbLock = threading.RLock()
if not os.path.exists(self.db_path):
os.makedirs(self.db_path)

Expand All @@ -64,14 +65,11 @@ def create(self, volume_id):

@handle_db_not_found
def _get_db(self, volume_id):
try:
with self.lock:
db = self.dbs[volume_id]
except KeyError:
db_path = self._get_db_path(volume_id)
with self.lock:
with self.dbLock:
if volume_id not in self.dbs:
db_path = self._get_db_path(volume_id)
self.dbs[volume_id] = DB(db_path, create_if_missing=False)
db = self.dbs[volume_id]
db = self.dbs[volume_id]
return db

def _get_db_chunk(self, volume_id):
Expand All @@ -90,24 +88,16 @@ def chunk_push(self, volume_id,
# won't happen (often) and that they can be recovered with a
# subsequent upload
chunk_db = self._get_db_chunk(volume_id)
value = chunk_db.get(key)
if value is not None:
value = json.loads(value)
else:
value = dict()

for k, v in data.iteritems():
value[k] = v

if 'mtime' not in value: # not consistent
if 'rtime' in value:
if 'mtime' not in data: # not consistent
if 'rtime' in data:
# In functionnal test, we can encounter the case where rebuild
# update (rtime) arrives before creation update (first mtime)
value['mtime'] = value['rtime']
data['mtime'] = data['rtime']
else:
raise ServerException("mtime is mandatory")

value = json.dumps(value)
value = json.dumps(data)
chunk_db.put(key, value.encode('utf8'))

def chunk_delete(self, volume_id, container_id, content_id, chunk_id):
Expand Down Expand Up @@ -195,26 +185,24 @@ def admin_get_incident_date(self, volume_id):

def admin_clear(self, volume_id, clear_all):
# FIXME we could benefit a per-DB lock
with self.lock:
db = self._get_db_chunk(volume_id)
count = 0
for key, value in db:
if not clear_all:
data = json.loads(value)
if clear_all or 'rtime' in data:
count += 1
db.delete(key)
self._get_db_admin(volume_id).delete('incident_date')
db = self._get_db_chunk(volume_id)
count = 0
for key, value in db:
if not clear_all:
data = json.loads(value)
if clear_all or 'rtime' in data:
count += 1
db.delete(key)
self._get_db_admin(volume_id).delete('incident_date')
return count

def admin_lock(self, volume_id, who):
# FIXME we could benefit a per-DB lock
with self.lock:
ret = self._get_db_admin(volume_id).get('lock')
if ret is not None:
return ret # already locked
self._get_db_admin(volume_id).put('lock', who.encode('utf8'))
return None
ret = self._get_db_admin(volume_id).get('lock')
if ret is not None:
return ret # already locked
self._get_db_admin(volume_id).put('lock', who.encode('utf8'))
return None

def admin_unlock(self, volume_id):
self._get_db_admin(volume_id).delete('lock')
Expand Down
41 changes: 0 additions & 41 deletions tests/unit/rdir/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,6 @@ def test_chunk_push_no_rtime_no_mtime(self):
self.volume, self.container_0,
self.content_0, self.chunk_0)

def test_chunk_push_update_data(self):
# initial push
self.rdir.chunk_push(self.volume, self.container_0, self.content_0,
self.chunk_0, rtime=5555, mtime=6666)
data = self.rdir.chunk_fetch(self.volume)
self.assertEqual(data, [
("%s|%s|%s" %
(self.container_0, self.content_0, self.chunk_0),
{'mtime': 6666, 'rtime': 5555})
])

# update mtime and rtime
self.rdir.chunk_push(self.volume, self.container_0, self.content_0,
self.chunk_0, mtime=1111, rtime=2222)
data = self.rdir.chunk_fetch(self.volume)
self.assertEqual(data, [
("%s|%s|%s" %
(self.container_0, self.content_0, self.chunk_0),
{'mtime': 1111, 'rtime': 2222})
])

# update only mtime
self.rdir.chunk_push(self.volume, self.container_0, self.content_0,
self.chunk_0, mtime=9999)
data = self.rdir.chunk_fetch(self.volume)
self.assertEqual(data, [
("%s|%s|%s" %
(self.container_0, self.content_0, self.chunk_0),
{'mtime': 9999, 'rtime': 2222})
])

# update only rtime
self.rdir.chunk_push(self.volume, self.container_0, self.content_0,
self.chunk_0, rtime=7777)
data = self.rdir.chunk_fetch(self.volume)
self.assertEqual(data, [
("%s|%s|%s" %
(self.container_0, self.content_0, self.chunk_0),
{'mtime': 9999, 'rtime': 7777})
])

def test_chunk_delete(self):
self.rdir.chunk_push(self.volume, self.container_0, self.content_0,
self.chunk_0, rtime=5555, mtime=6666)
Expand Down

0 comments on commit f478aa0

Please sign in to comment.