Skip to content

Commit

Permalink
fix: defer clearing global cache when in transaction (#660)
Browse files Browse the repository at this point in the history
* fix: defer clearing global cache when in transaction

When in a transaction, keys should only be cleared from the global cache
after the transaction has been committed.

Fixes #650 #657
  • Loading branch information
Chris Rossi authored Jun 7, 2021
1 parent 7dc11df commit 73020ed
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 46 deletions.
12 changes: 9 additions & 3 deletions google/cloud/ndb/_datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ def put(entity, options):
key = None

if use_global_cache:
yield _cache.global_delete(cache_key)
if transaction:
context.global_cache_flush_keys.add(cache_key)
else:
yield _cache.global_delete(cache_key)

raise tasklets.Return(key)

Expand All @@ -406,6 +409,7 @@ def delete(key, options):
context = context_module.get_context()
use_global_cache = context._use_global_cache(key, options)
use_datastore = context._use_datastore(key, options)
transaction = context.transaction

if use_global_cache:
cache_key = _cache.global_cache_key(key)
Expand All @@ -414,7 +418,6 @@ def delete(key, options):
if use_global_cache:
yield _cache.global_lock(cache_key)

transaction = context.transaction
if transaction:
batch = _get_commit_batch(transaction, options)
else:
Expand All @@ -423,7 +426,10 @@ def delete(key, options):
yield batch.delete(key)

if use_global_cache:
yield _cache.global_delete(cache_key)
if transaction:
context.global_cache_flush_keys.add(cache_key)
else:
yield _cache.global_delete(cache_key)


class _NonTransactionalCommitBatch(object):
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/ndb/_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def transaction_async_(
@tasklets.tasklet
def _transaction_async(context, callback, read_only=False):
# Avoid circular import in Python 2.7
from google.cloud.ndb import _cache
from google.cloud.ndb import _datastore_api

# Start the transaction
Expand Down Expand Up @@ -281,6 +282,7 @@ def run_inner_loop(inner_context):

context.eventloop.add_idle(run_inner_loop, tx_context)

tx_context.global_cache_flush_keys = flush_keys = set()
with tx_context.use():
try:
# Run the callback
Expand All @@ -301,7 +303,10 @@ def run_inner_loop(inner_context):
yield _datastore_api.rollback(transaction_id)
raise e

tx_context._clear_global_cache()
# Flush keys of entities written during the transaction from the global cache
if flush_keys:
yield [_cache.global_delete(key) for key in flush_keys]

for callback in on_commit_callbacks:
callback()

Expand Down
23 changes: 3 additions & 20 deletions google/cloud/ndb/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from google.cloud.ndb import _eventloop
from google.cloud.ndb import exceptions
from google.cloud.ndb import key as key_module
from google.cloud.ndb import tasklets


try: # pragma: NO PY2 COVER
Expand Down Expand Up @@ -242,6 +241,7 @@ def __new__(
cache=None,
cache_policy=None,
global_cache=None,
global_cache_flush_keys=None,
global_cache_policy=None,
global_cache_timeout_policy=None,
datastore_policy=None,
Expand Down Expand Up @@ -289,6 +289,8 @@ def __new__(
context.set_datastore_policy(datastore_policy)
context.set_retry_state(retry)

context.global_cache_flush_keys = global_cache_flush_keys

return context

def new(self, **kwargs):
Expand Down Expand Up @@ -327,25 +329,6 @@ def use(self):
_state.toplevel_context = None
_state.context = prev_context

@tasklets.tasklet
def _clear_global_cache(self):
"""Clears the global cache.
Clears keys from the global cache that appear in the local context
cache. In this way, only keys that were touched in the current context
are affected.
"""
# Prevent circular import in Python 2.7
from google.cloud.ndb import _cache

keys = [
_cache.global_cache_key(key._key)
for key in self.cache.keys()
if self._use_global_cache(key)
]
if keys:
yield [_cache.global_delete(key) for key in keys]

def _use_cache(self, key, options=None):
"""Return whether to use the context cache for this key."""
flag = options.use_cache if options else None
Expand Down
38 changes: 38 additions & 0 deletions tests/unit/test__datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,27 @@ class SomeKind(model.Model):

assert global_cache.get([cache_key]) == [None]

@staticmethod
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
def test_w_transaction(Batch, global_cache):
class SomeKind(model.Model):
pass

context = context_module.get_context()
with context.new(transaction=b"abc123").use() as in_context:
in_context.global_cache_flush_keys = set()
key = key_module.Key("SomeKind", 1)
cache_key = _cache.global_cache_key(key._key)

entity = SomeKind(key=key)
batch = Batch.return_value
batch.put.return_value = future_result(None)

future = _api.put(model._entity_to_ds_entity(entity), _options.Options())
assert future.result() is None

assert in_context.global_cache_flush_keys == {cache_key}

@staticmethod
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
def test_no_datastore(Batch, global_cache):
Expand Down Expand Up @@ -818,6 +839,23 @@ def test_cache_enabled(Batch, global_cache):

assert global_cache.get([cache_key]) == [None]

@staticmethod
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
def test_w_transaction(Batch, global_cache):
context = context_module.get_context()
with context.new(transaction=b"abc123").use() as in_context:
in_context.global_cache_flush_keys = set()
key = key_module.Key("SomeKind", 1)
cache_key = _cache.global_cache_key(key._key)

batch = Batch.return_value
batch.delete.return_value = future_result(None)

future = _api.delete(key._key, _options.Options())
assert future.result() is None

assert in_context.global_cache_flush_keys == {cache_key}

@staticmethod
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
def test_without_datastore(Batch, global_cache):
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/test__transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from google.cloud.ndb import tasklets
from google.cloud.ndb import _transaction

from . import utils


class Test_in_transaction:
@staticmethod
Expand Down Expand Up @@ -405,6 +407,35 @@ def callback():

assert future.result() == "I tried, momma."

@staticmethod
@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._cache")
@mock.patch("google.cloud.ndb._datastore_api")
def test_success_flush_keys(_datastore_api, _cache):
def callback():
context = context_module.get_context()
context.global_cache_flush_keys.add(b"abc123")
return "I tried, momma."

_cache.global_delete.return_value = utils.future_result(None)

begin_future = tasklets.Future("begin transaction")
_datastore_api.begin_transaction.return_value = begin_future

commit_future = tasklets.Future("commit transaction")
_datastore_api.commit.return_value = commit_future

future = _transaction.transaction_async(callback, retries=0)

_datastore_api.begin_transaction.assert_called_once_with(False, retries=0)
begin_future.set_result(b"tx123")

_datastore_api.commit.assert_called_once_with(b"tx123", retries=0)
commit_future.set_result(None)

assert future.result() == "I tried, momma."
_cache.global_delete.assert_called_once_with(b"abc123")

@staticmethod
@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._datastore_api")
Expand Down
27 changes: 5 additions & 22 deletions tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
except ImportError: # pragma: NO PY3 COVER
import mock

from google.cloud.ndb import _cache
from google.cloud.ndb import context as context_module
from google.cloud.ndb import _eventloop
from google.cloud.ndb import exceptions
from google.cloud.ndb import global_cache
from google.cloud.ndb import key as key_module
from google.cloud.ndb import model
from google.cloud.ndb import _options
Expand Down Expand Up @@ -95,6 +93,11 @@ def test_new_transaction(self):
assert new_context.transaction == "tx123"
assert context.transaction is None

def test_new_global_cache_flush_keys(self):
context = self._make_one(global_cache_flush_keys={"hi", "mom!"})
new_context = context.new()
assert new_context.global_cache_flush_keys == {"hi", "mom!"}

def test_new_with_cache(self):
context = self._make_one()
context.cache["foo"] = "bar"
Expand Down Expand Up @@ -128,26 +131,6 @@ def test_clear_cache(self):
context.clear_cache()
assert not context.cache

def test__clear_global_cache(self):
context = self._make_one(global_cache=global_cache._InProcessGlobalCache())
with context.use():
key = key_module.Key("SomeKind", 1)
cache_key = _cache.global_cache_key(key._key)
context.cache[key] = "testdata"
context.global_cache.cache[cache_key] = "testdata"
context.global_cache.cache["anotherkey"] = "otherdata"
context._clear_global_cache().result()

assert context.global_cache.cache == {"anotherkey": "otherdata"}

def test__clear_global_cache_nothing_to_do(self):
context = self._make_one(global_cache=global_cache._InProcessGlobalCache())
with context.use():
context.global_cache.cache["anotherkey"] = "otherdata"
context._clear_global_cache().result()

assert context.global_cache.cache == {"anotherkey": "otherdata"}

def test_flush(self):
eventloop = mock.Mock(spec=("run",))
context = self._make_one(eventloop=eventloop)
Expand Down

0 comments on commit 73020ed

Please sign in to comment.