Skip to content

Commit 49c84fa

Browse files
committed
LITE-31232 Init implementation of bulk_relate_cqrs_serialization
1 parent 1d122bf commit 49c84fa

File tree

5 files changed

+104
-6
lines changed

5 files changed

+104
-6
lines changed

dj_cqrs/mixins.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2024 Ingram Micro Inc. All rights reserved.
22

33
import logging
44

@@ -20,6 +20,7 @@
2020
from dj_cqrs.managers import MasterManager, ReplicaManager
2121
from dj_cqrs.metas import MasterMeta, ReplicaMeta
2222
from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update
23+
from dj_cqrs.state import cqrs_state
2324

2425

2526
logger = logging.getLogger('django-cqrs')
@@ -292,9 +293,16 @@ def _class_serialization(self, using, sync=False):
292293
if sync:
293294
instance = self
294295
else:
296+
instance = None
295297
db = using if using is not None else self._state.db
296-
qs = self.__class__._default_manager.using(db)
297-
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)
298+
299+
bulk_relate_cm = cqrs_state.bulk_relate_cm
300+
if bulk_relate_cm:
301+
instance = bulk_relate_cm.get_cached_instance(self, db)
302+
303+
if not instance:
304+
qs = self.__class__._default_manager.using(db)
305+
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)
298306

299307
data = self._cqrs_serializer_cls(instance).data
300308
data['cqrs_revision'] = instance.cqrs_revision

dj_cqrs/signals.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2024 Ingram Micro Inc. All rights reserved.
22

33
import logging
44

@@ -9,6 +9,7 @@
99
from dj_cqrs.constants import SignalType
1010
from dj_cqrs.controller import producer
1111
from dj_cqrs.dataclasses import TransportPayload
12+
from dj_cqrs.state import cqrs_state
1213
from dj_cqrs.utils import get_message_expiration_dt
1314

1415

@@ -64,6 +65,10 @@ def post_save(cls, sender, **kwargs):
6465

6566
using = kwargs['using']
6667

68+
bulk_relate_cm = cqrs_state.bulk_relate_cm
69+
if bulk_relate_cm:
70+
bulk_relate_cm.register(instance, using)
71+
6772
sync = kwargs.get('sync', False)
6873
queue = kwargs.get('queue', None)
6974

dj_cqrs/state.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Copyright © 2024 Ingram Micro Inc. All rights reserved.
2+
3+
import threading
4+
5+
6+
cqrs_state = threading.local()
7+
cqrs_state.bulk_relate_cm = None

dj_cqrs/utils.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2024 Ingram Micro Inc. All rights reserved.
22

33
import logging
4+
from collections import defaultdict
5+
from contextlib import ContextDecorator
46
from datetime import date, datetime, timedelta
57
from uuid import UUID
68

@@ -10,6 +12,7 @@
1012

1113
from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
1214
from dj_cqrs.logger import install_last_query_capturer
15+
from dj_cqrs.state import cqrs_state
1316

1417

1518
logger = logging.getLogger('django-cqrs')
@@ -80,3 +83,55 @@ def apply_query_timeouts(model_cls): # pragma: no cover
8083
cursor.execute(statement, params=(query_timeout,))
8184

8285
install_last_query_capturer(model_cls)
86+
87+
88+
class _BulkRelateCM(ContextDecorator):
89+
def __init__(self, cqrs_id=None):
90+
self._cqrs_id = cqrs_id
91+
self._mapping = defaultdict(lambda: defaultdict(list))
92+
self._cache = {}
93+
94+
def register(self, instance, using):
95+
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
96+
if self._cqrs_id and instance_cqrs_id != self._cqrs_id:
97+
return
98+
99+
self._mapping[instance_cqrs_id][using].append(instance.pk)
100+
101+
def get_cached_instance(self, instance, using):
102+
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
103+
if self._cqrs_id and instance_cqrs_id != self._cqrs_id:
104+
return
105+
106+
instance_pk = instance.pk
107+
cached_instances = self._cache.get(instance_cqrs_id, {}).get(using, {})
108+
if cached_instances:
109+
return cached_instances.get(instance_pk)
110+
111+
cached_pks = self._mapping[instance_cqrs_id][using]
112+
if not cached_pks:
113+
return
114+
115+
qs = instance.__class__._default_manager.using(using)
116+
instances_cache = {
117+
instance.pk: instance
118+
for instance in instance.__class__.relate_cqrs_serialization(qs).filter(
119+
pk__in=cached_pks,
120+
).order_by().all()
121+
}
122+
self._cache.update({
123+
instance_cqrs_id: {
124+
using: instances_cache,
125+
},
126+
})
127+
return instances_cache.get(instance_pk)
128+
129+
def __enter__(self):
130+
cqrs_state.bulk_relate_cm = self
131+
132+
def __exit__(self, exc_type, exc_val, exc_tb):
133+
cqrs_state.bulk_relate_cm = None
134+
135+
136+
def bulk_relate_cqrs_serialization(cqrs_id=None):
137+
return _BulkRelateCM(cqrs_id=cqrs_id)

tests/test_master/test_signals.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2024 Ingram Micro Inc. All rights reserved.
22

33
from datetime import datetime, timezone
44

@@ -8,6 +8,7 @@
88

99
from dj_cqrs.constants import SignalType
1010
from dj_cqrs.signals import post_bulk_create, post_update
11+
from dj_cqrs.utils import bulk_relate_cqrs_serialization
1112
from tests.dj_master import models
1213
from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args
1314

@@ -127,6 +128,28 @@ def test_manual_post_bulk_create(mocker):
127128
assert publisher_mock.call_count == 3
128129

129130

131+
@pytest.mark.django_db(transaction=True)
132+
@pytest.mark.parametrize('count', (1, 3, 5))
133+
def test_bulk_relate_cqrs_serialization(
134+
django_assert_num_queries,
135+
django_v_trans_q_count_sup,
136+
mocker,
137+
count,
138+
):
139+
mocker.patch('dj_cqrs.controller.producer.produce')
140+
141+
opt_query_count = count + 2 + django_v_trans_q_count_sup
142+
with django_assert_num_queries(opt_query_count):
143+
with bulk_relate_cqrs_serialization():
144+
with transaction.atomic(savepoint=False):
145+
[models.Author.objects.create(id=i) for i in range(count)]
146+
147+
not_opt_query_count = count + count * 2 + django_v_trans_q_count_sup
148+
with django_assert_num_queries(not_opt_query_count):
149+
with transaction.atomic(savepoint=False):
150+
[models.Author.objects.create(id=10 + i) for i in range(count)]
151+
152+
130153
@pytest.mark.django_db(transaction=True)
131154
def test_automatic_post_bulk_create(mocker):
132155
publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')

0 commit comments

Comments
 (0)