Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/synced collection/cleanup #466

Merged
merged 10 commits into from
Jan 6, 2021
24 changes: 24 additions & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,22 @@ synced\_collections.buffered\_collection module
:undoc-members:
:show-inheritance:

synced\_collections.file\_buffered\_collection module
-----------------------------------------------

.. automodule:: signac.core.synced_collections.file_buffered_collection
:members:
:undoc-members:
:show-inheritance:

synced\_collections.memory\_buffered\_collection module
-----------------------------------------------

.. automodule:: signac.core.synced_collections.memory_buffered_collection
:members:
:undoc-members:
:show-inheritance:

synced\_collections.caching module
----------------------------------

Expand Down Expand Up @@ -247,3 +263,11 @@ synced\_collections.collection\_zarr module
:members:
:undoc-members:
:show-inheritance:

synced\_collections.utils module
-------------------------------------------

.. automodule:: signac.core.synced_collections.utils
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion signac/core/synced_collections/collection_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _save_to_resource(self):
# replace that file with original file. We also enable this mode
# irrespective of the write_concern flag if we're running in
# multithreaded mode.
if self._write_concern or hasattr(type(self), "_locks"):
if self._write_concern or type(self)._threading_support_is_active:
dirname, filename = os.path.split(self._filename)
fn_tmp = os.path.join(dirname, f"._{uuid.uuid4()}_{filename}")
with open(fn_tmp, "wb") as tmpfile:
Expand Down
35 changes: 12 additions & 23 deletions signac/core/synced_collections/collection_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Implements a MongoDB SyncedCollection backend."""
from copy import deepcopy

import bson

from .synced_attr_dict import SyncedAttrDict
Expand Down Expand Up @@ -68,19 +66,6 @@ def _save_to_resource(self):
except bson.errors.InvalidDocument as err:
raise TypeError(str(err))

def _pseudo_deepcopy(self):
"""Return a copy of instance.

It is a pseudo implementation for `deepcopy` because
:py:class:`pymongo.collection.Collection` does not support `deepcopy` method.
"""
return type(self)(
collection=self._collection,
uid=self._uid,
data=self._to_base(),
parent=deepcopy(self._parent),
)

@property
def collection(self):
"""pymongo.collection.Collection: Get the collection being synced to."""
Expand All @@ -91,6 +76,10 @@ def uid(self): # noqa: D401
"""dict: Get the unique mapping used to identify this collection."""
return self._uid

def __deepcopy__(self, memo):
# The underlying MongoDB collection cannot be deepcopied.
raise TypeError("MongoDBCollection does not support deepcopying.")


class MongoDBDict(MongoDBCollection, SyncedAttrDict):
"""A dict-like mapping interface to a persistent document in a MongoDB collection.
Expand All @@ -116,10 +105,10 @@ class MongoDBDict(MongoDBCollection, SyncedAttrDict):

While the MongoDBDict object behaves like a dictionary, there are important
distinctions to remember. In particular, because operations are reflected
as changes to an underlying database, copying (even deep copying) a
MongoDBDict instance may exhibit unexpected behavior. If a true copy is
required, you should use the call operator to get a dictionary
representation, and if necessary construct a new MongoDBDict instance:
as changes to an underlying database, copying a MongoDBDict instance may
exhibit unexpected behavior. If a true copy is required, you should use the
call operator to get a dictionary representation, and if necessary
construct a new MongoDBDict instance:
``new_dict = MongoDBDict(old_dict())``.

Parameters
Expand Down Expand Up @@ -173,10 +162,10 @@ class MongoDBList(MongoDBCollection, SyncedList):

While the MongoDBList object behaves like a list, there are important
distinctions to remember. In particular, because operations are reflected
as changes to an underlying database, copying (even deep copying) a
MongoDBList instance may exhibit unexpected behavior. If a true copy is
required, you should use the call operator to get a dictionary
representation, and if necessary construct a new MongoDBList instance:
as changes to an underlying database, copying a MongoDBList instance may
exhibit unexpected behavior. If a true copy is required, you should use the
call operator to get a dictionary representation, and if necessary
construct a new MongoDBList instance:
``new_list = MongoDBList(old_list())``.

"""
Expand Down
44 changes: 13 additions & 31 deletions signac/core/synced_collections/collection_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# This software is licensed under the BSD 3-Clause License.
"""Implements a Redis SyncedCollection backend."""
import json
from copy import deepcopy

from .synced_attr_dict import SyncedAttrDict
from .synced_collection import SyncedCollection
Expand Down Expand Up @@ -48,26 +47,6 @@ def _save_to_resource(self):
"""Write the data to a Redis database."""
self._client.set(self._key, json.dumps(self._to_base()).encode())

def _pseudo_deepcopy(self):
"""Return a copy of instance.

It is a pseudo implementation for `deepcopy` because
`redis.Redis` does not support `deepcopy` method.
"""
if self._parent is not None:
# TODO: Do we really want a deep copy of a nested collection to
# deep copy the parent? Perhaps we should simply disallow this?
return type(self)(
client=None,
key=None,
data=self._to_base(),
parent=deepcopy(self._parent),
)
else:
return type(self)(
client=self._client, key=self._key, data=None, parent=None
)

@property
def client(self):
"""`redis.Redis`: The Redis client used to store the data."""
Expand All @@ -78,6 +57,10 @@ def key(self):
"""str: The key associated with this collection stored in Redis."""
return self._key

def __deepcopy__(self, memo):
# The underlying Redis client cannot be deepcopied.
raise TypeError("RedisCollection does not support deepcopying.")


class RedisDict(RedisCollection, SyncedAttrDict):
"""A dict-like mapping interface to a persistent Redis-database.
Expand Down Expand Up @@ -114,11 +97,10 @@ class RedisDict(RedisCollection, SyncedAttrDict):

While the RedisDict object behaves like a dictionary, there are important
distinctions to remember. In particular, because operations are reflected
as changes to an underlying database, copying (even deep copying) a
RedisDict instance may exhibit unexpected behavior. If a true copy is
required, you should use the call operator to get a dictionary
representation, and if necessary construct a new RedisDict instance:
``new_dict = RedisDict(old_dict())``.
as changes to an underlying database, copying a RedisDict instance may
exhibit unexpected behavior. If a true copy is required, you should use the
call operator to get a dictionary representation, and if necessary
construct a new RedisDict instance: ``new_dict = RedisDict(old_dict())``.

"""

Expand All @@ -144,11 +126,11 @@ class RedisList(RedisCollection, SyncedList):

While the RedisList object behaves like a list, there are
important distinctions to remember. In particular, because operations
are reflected as changes to an underlying database, copying (even deep
copying) a RedisList instance may exhibit unexpected behavior. If a
true copy is required, you should use the call operator to get a
dictionary representation, and if necessary construct a new RedisList
instance: `new_list = RedisList(old_list())`.
are reflected as changes to an underlying database, copying a RedisList
instance may exhibit unexpected behavior. If a true copy is required,
you should use the call operator to get a dictionary representation,
and if necessary construct a new RedisList instance:
``new_list = RedisList(old_list())``.

Parameters
----------
Expand Down
21 changes: 14 additions & 7 deletions signac/core/synced_collections/collection_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""Implements a Zarr SyncedCollection backend."""
from copy import deepcopy

# TODO: Give a clearer error if the numcodecs import fails.
import numcodecs

from .synced_attr_dict import SyncedAttrDict
Expand All @@ -26,17 +25,18 @@ class ZarrCollection(SyncedCollection):
The Zarr group in which to store data.
name : str
The name under which this collection is stored in the Zarr group.
codec : numcodecs.abc.Codec
The encoding mechanism for the data. If not provided, defaults to JSON
encoding (Default value: None).

"""

_backend = __name__ # type: ignore

def __init__(self, group=None, name=None, **kwargs):
def __init__(self, group=None, name=None, codec=None, **kwargs):
self._root = group
# TODO: Give users control over the codec. If we use JSON encoding,
# then we should also use the JSON validator.
self._object_codec = numcodecs.JSON()
self._name = name
self._object_codec = numcodecs.JSON() if codec is None else codec
super().__init__(**kwargs)

def _load_from_resource(self):
Expand Down Expand Up @@ -68,8 +68,6 @@ def _save_to_resource(self):

def __deepcopy__(self, memo):
if self._parent is not None:
# TODO: Do we really want a deep copy of a nested collection to
# deep copy the parent? Perhaps we should simply disallow this?
return type(self)(
group=None,
name=None,
Expand All @@ -84,6 +82,15 @@ def __deepcopy__(self, memo):
parent=None,
)

@property
def codec(self):
"""numcodecs.abc.Codec: The encoding method used for the data."""
return self._object_codec

@codec.setter
def codec(self, new_codec):
self._object_codec = new_codec

@property
def group(self):
"""zarr.hierarchy.Group: The Zarr group storing the data."""
Expand Down
21 changes: 21 additions & 0 deletions signac/core/synced_collections/file_buffered_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ class FileBufferedCollection(BufferedCollection):
process, but this is dependent on its constructor being called before
those of other classes.

Warnings
--------
Although it can be done safely, in general modifying two different collections
pointing to the same underlying resource while both are in different buffering
modes is unsupported and can lead to undefined behavior. This class makes a
best effort at performing safe modifications, but it is possible to construct
nested buffered contexts for different objects that can lead to an invalid
buffer state, or even situations where there is no obvious indicator of what
is the canonical source of truth. In general, if you need multiple objects
pointing to the same resource, it is **strongly** recommeneded to work with
both of them in identical buffering states at all times.

"""

# Note for developers: since all subclasses share a single cache, all
Expand Down Expand Up @@ -295,6 +307,15 @@ def _load_from_buffer(self):
if id(self) not in FileBufferedCollection._cached_collections:
FileBufferedCollection._cached_collections[id(self)] = self
else:
# The first time this method is called, if nothing is in the buffer
# for this file then we cannot guarantee that the _data attribute
# is valid either since the resource could have been modified
# between when _data was last updated and when this load is being
# called. As a result, we have to load from the resource here to be
# safe.
data = self._load_from_resource()
with self._suspend_sync():
self._update(data)
self._initialize_data_in_cache()

# Load from buffer
Expand Down
35 changes: 21 additions & 14 deletions signac/core/synced_collections/memory_buffered_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,22 @@ class SharedMemoryFileBufferedCollection(BufferedCollection):
process, but this is dependent on its constructor being called before
those of other classes.

Warnings
--------
Although it can be done safely, in general modifying two different collections
pointing to the same underlying resource while both are in different buffering
modes is unsupported and can lead to undefined behavior. This class makes a
best effort at performing safe modifications, but it is possible to construct
nested buffered contexts for different objects that can lead to an invalid
buffer state, or even situations where there is no obvious indicator of what
is the canonical source of truth. In general, if you need multiple objects
pointing to the same resource, it is **strongly** recommeneded to work with
both of them in identical buffering states at all times.

"""

_cache: Dict[str, Dict[str, Union[bytes, str, Tuple[int, float, int]]]] = {}
_cached_collections: Dict[int, BufferedCollection] = {}
# TODO: Do we really care about the total number of objects stored in the
# buffer, or do we only care about objects that have been modified (and
# therefore require a file write when flushing)? For files that have been
# read but not modified, I don't think that there's any reason not to just
# let them sit in memory, or at least to give the user an option to allow
# that in case they know that virtual memory exhaustion won't be an issue.
_BUFFER_CAPACITY = 1000 # The number of collections to store in the buffer.
_CURRENT_BUFFER_SIZE = 0

Expand Down Expand Up @@ -180,9 +186,6 @@ def _flush(self, force=False):

"""
if not self._is_buffered or force:
# TODO: If we have two objects pointing to the same filename in the
# cache and one of them flushes before the other, need to decide
# how to handle it.
try:
_, stored_metadata = self._cached_collections.pop(id(self))
cached_data = self._cache[self._filename]
Expand Down Expand Up @@ -283,11 +286,15 @@ def _load_from_buffer(self):
self._cache[self._filename]["metadata"],
)
else:
# TODO: The first time we call _load_from_buffer we might need to call
# _load_from_resource. Otherwise, if something modified the file in memory
# since the last time that we performed any save/load operation, we could be
# putting an out-of-date state into the buffer. This also affects the
# FileBufferedCollection.
# The first time this method is called, if nothing is in the buffer
# for this file then we cannot guarantee that the _data attribute
# is valid either since the resource could have been modified
# between when _data was last updated and when this load is being
# called. As a result, we have to load from the resource here to be
# safe.
data = self._load_from_resource()
with self._suspend_sync():
self._update(data)
self._initialize_data_in_cache(modified=False)

# Set local data to the version in the buffer.
Expand Down
Loading