Skip to content

Commit e84cd4d

Browse files
author
Chris Rossi
authored
fix: defer clearing global cache when in transaction (#660)
* fix: defer clearing global cache when in transaction When in a transaction, keys should only be cleared from the global cache after the transaction has been committed. Fixes #650 #657
1 parent 263cf76 commit e84cd4d

File tree

6 files changed

+92
-46
lines changed

6 files changed

+92
-46
lines changed

packages/google-cloud-ndb/google/cloud/ndb/_datastore_api.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,10 @@ def put(entity, options):
383383
key = None
384384

385385
if use_global_cache:
386-
yield _cache.global_delete(cache_key)
386+
if transaction:
387+
context.global_cache_flush_keys.add(cache_key)
388+
else:
389+
yield _cache.global_delete(cache_key)
387390

388391
raise tasklets.Return(key)
389392

@@ -406,6 +409,7 @@ def delete(key, options):
406409
context = context_module.get_context()
407410
use_global_cache = context._use_global_cache(key, options)
408411
use_datastore = context._use_datastore(key, options)
412+
transaction = context.transaction
409413

410414
if use_global_cache:
411415
cache_key = _cache.global_cache_key(key)
@@ -414,7 +418,6 @@ def delete(key, options):
414418
if use_global_cache:
415419
yield _cache.global_lock(cache_key)
416420

417-
transaction = context.transaction
418421
if transaction:
419422
batch = _get_commit_batch(transaction, options)
420423
else:
@@ -423,7 +426,10 @@ def delete(key, options):
423426
yield batch.delete(key)
424427

425428
if use_global_cache:
426-
yield _cache.global_delete(cache_key)
429+
if transaction:
430+
context.global_cache_flush_keys.add(cache_key)
431+
else:
432+
yield _cache.global_delete(cache_key)
427433

428434

429435
class _NonTransactionalCommitBatch(object):

packages/google-cloud-ndb/google/cloud/ndb/_transaction.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def transaction_async_(
250250
@tasklets.tasklet
251251
def _transaction_async(context, callback, read_only=False):
252252
# Avoid circular import in Python 2.7
253+
from google.cloud.ndb import _cache
253254
from google.cloud.ndb import _datastore_api
254255

255256
# Start the transaction
@@ -281,6 +282,7 @@ def run_inner_loop(inner_context):
281282

282283
context.eventloop.add_idle(run_inner_loop, tx_context)
283284

285+
tx_context.global_cache_flush_keys = flush_keys = set()
284286
with tx_context.use():
285287
try:
286288
# Run the callback
@@ -301,7 +303,10 @@ def run_inner_loop(inner_context):
301303
yield _datastore_api.rollback(transaction_id)
302304
raise e
303305

304-
tx_context._clear_global_cache()
306+
# Flush keys of entities written during the transaction from the global cache
307+
if flush_keys:
308+
yield [_cache.global_delete(key) for key in flush_keys]
309+
305310
for callback in on_commit_callbacks:
306311
callback()
307312

packages/google-cloud-ndb/google/cloud/ndb/context.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from google.cloud.ndb import _eventloop
2424
from google.cloud.ndb import exceptions
2525
from google.cloud.ndb import key as key_module
26-
from google.cloud.ndb import tasklets
2726

2827

2928
try: # pragma: NO PY2 COVER
@@ -242,6 +241,7 @@ def __new__(
242241
cache=None,
243242
cache_policy=None,
244243
global_cache=None,
244+
global_cache_flush_keys=None,
245245
global_cache_policy=None,
246246
global_cache_timeout_policy=None,
247247
datastore_policy=None,
@@ -289,6 +289,8 @@ def __new__(
289289
context.set_datastore_policy(datastore_policy)
290290
context.set_retry_state(retry)
291291

292+
context.global_cache_flush_keys = global_cache_flush_keys
293+
292294
return context
293295

294296
def new(self, **kwargs):
@@ -327,25 +329,6 @@ def use(self):
327329
_state.toplevel_context = None
328330
_state.context = prev_context
329331

330-
@tasklets.tasklet
331-
def _clear_global_cache(self):
332-
"""Clears the global cache.
333-
334-
Clears keys from the global cache that appear in the local context
335-
cache. In this way, only keys that were touched in the current context
336-
are affected.
337-
"""
338-
# Prevent circular import in Python 2.7
339-
from google.cloud.ndb import _cache
340-
341-
keys = [
342-
_cache.global_cache_key(key._key)
343-
for key in self.cache.keys()
344-
if self._use_global_cache(key)
345-
]
346-
if keys:
347-
yield [_cache.global_delete(key) for key in keys]
348-
349332
def _use_cache(self, key, options=None):
350333
"""Return whether to use the context cache for this key."""
351334
flag = options.use_cache if options else None

packages/google-cloud-ndb/tests/unit/test__datastore_api.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,27 @@ class SomeKind(model.Model):
709709

710710
assert global_cache.get([cache_key]) == [None]
711711

712+
@staticmethod
713+
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
714+
def test_w_transaction(Batch, global_cache):
715+
class SomeKind(model.Model):
716+
pass
717+
718+
context = context_module.get_context()
719+
with context.new(transaction=b"abc123").use() as in_context:
720+
in_context.global_cache_flush_keys = set()
721+
key = key_module.Key("SomeKind", 1)
722+
cache_key = _cache.global_cache_key(key._key)
723+
724+
entity = SomeKind(key=key)
725+
batch = Batch.return_value
726+
batch.put.return_value = future_result(None)
727+
728+
future = _api.put(model._entity_to_ds_entity(entity), _options.Options())
729+
assert future.result() is None
730+
731+
assert in_context.global_cache_flush_keys == {cache_key}
732+
712733
@staticmethod
713734
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
714735
def test_no_datastore(Batch, global_cache):
@@ -818,6 +839,23 @@ def test_cache_enabled(Batch, global_cache):
818839

819840
assert global_cache.get([cache_key]) == [None]
820841

842+
@staticmethod
843+
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
844+
def test_w_transaction(Batch, global_cache):
845+
context = context_module.get_context()
846+
with context.new(transaction=b"abc123").use() as in_context:
847+
in_context.global_cache_flush_keys = set()
848+
key = key_module.Key("SomeKind", 1)
849+
cache_key = _cache.global_cache_key(key._key)
850+
851+
batch = Batch.return_value
852+
batch.delete.return_value = future_result(None)
853+
854+
future = _api.delete(key._key, _options.Options())
855+
assert future.result() is None
856+
857+
assert in_context.global_cache_flush_keys == {cache_key}
858+
821859
@staticmethod
822860
@mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch")
823861
def test_without_datastore(Batch, global_cache):

packages/google-cloud-ndb/tests/unit/test__transaction.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
from google.cloud.ndb import tasklets
2929
from google.cloud.ndb import _transaction
3030

31+
from . import utils
32+
3133

3234
class Test_in_transaction:
3335
@staticmethod
@@ -405,6 +407,35 @@ def callback():
405407

406408
assert future.result() == "I tried, momma."
407409

410+
@staticmethod
411+
@pytest.mark.usefixtures("in_context")
412+
@mock.patch("google.cloud.ndb._cache")
413+
@mock.patch("google.cloud.ndb._datastore_api")
414+
def test_success_flush_keys(_datastore_api, _cache):
415+
def callback():
416+
context = context_module.get_context()
417+
context.global_cache_flush_keys.add(b"abc123")
418+
return "I tried, momma."
419+
420+
_cache.global_delete.return_value = utils.future_result(None)
421+
422+
begin_future = tasklets.Future("begin transaction")
423+
_datastore_api.begin_transaction.return_value = begin_future
424+
425+
commit_future = tasklets.Future("commit transaction")
426+
_datastore_api.commit.return_value = commit_future
427+
428+
future = _transaction.transaction_async(callback, retries=0)
429+
430+
_datastore_api.begin_transaction.assert_called_once_with(False, retries=0)
431+
begin_future.set_result(b"tx123")
432+
433+
_datastore_api.commit.assert_called_once_with(b"tx123", retries=0)
434+
commit_future.set_result(None)
435+
436+
assert future.result() == "I tried, momma."
437+
_cache.global_delete.assert_called_once_with(b"abc123")
438+
408439
@staticmethod
409440
@pytest.mark.usefixtures("in_context")
410441
@mock.patch("google.cloud.ndb._datastore_api")

packages/google-cloud-ndb/tests/unit/test_context.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
except ImportError: # pragma: NO PY3 COVER
2020
import mock
2121

22-
from google.cloud.ndb import _cache
2322
from google.cloud.ndb import context as context_module
2423
from google.cloud.ndb import _eventloop
2524
from google.cloud.ndb import exceptions
26-
from google.cloud.ndb import global_cache
2725
from google.cloud.ndb import key as key_module
2826
from google.cloud.ndb import model
2927
from google.cloud.ndb import _options
@@ -95,6 +93,11 @@ def test_new_transaction(self):
9593
assert new_context.transaction == "tx123"
9694
assert context.transaction is None
9795

96+
def test_new_global_cache_flush_keys(self):
97+
context = self._make_one(global_cache_flush_keys={"hi", "mom!"})
98+
new_context = context.new()
99+
assert new_context.global_cache_flush_keys == {"hi", "mom!"}
100+
98101
def test_new_with_cache(self):
99102
context = self._make_one()
100103
context.cache["foo"] = "bar"
@@ -128,26 +131,6 @@ def test_clear_cache(self):
128131
context.clear_cache()
129132
assert not context.cache
130133

131-
def test__clear_global_cache(self):
132-
context = self._make_one(global_cache=global_cache._InProcessGlobalCache())
133-
with context.use():
134-
key = key_module.Key("SomeKind", 1)
135-
cache_key = _cache.global_cache_key(key._key)
136-
context.cache[key] = "testdata"
137-
context.global_cache.cache[cache_key] = "testdata"
138-
context.global_cache.cache["anotherkey"] = "otherdata"
139-
context._clear_global_cache().result()
140-
141-
assert context.global_cache.cache == {"anotherkey": "otherdata"}
142-
143-
def test__clear_global_cache_nothing_to_do(self):
144-
context = self._make_one(global_cache=global_cache._InProcessGlobalCache())
145-
with context.use():
146-
context.global_cache.cache["anotherkey"] = "otherdata"
147-
context._clear_global_cache().result()
148-
149-
assert context.global_cache.cache == {"anotherkey": "otherdata"}
150-
151134
def test_flush(self):
152135
eventloop = mock.Mock(spec=("run",))
153136
context = self._make_one(eventloop=eventloop)

0 commit comments

Comments
 (0)