32
32
33
33
from synapse .api .constants import EventTypes , Membership
34
34
from synapse .metrics import LaterGauge
35
- from synapse .metrics .background_process_metrics import (
36
- run_as_background_process ,
37
- wrap_as_background_process ,
38
- )
35
+ from synapse .metrics .background_process_metrics import wrap_as_background_process
39
36
from synapse .storage ._base import SQLBaseStore , db_to_json , make_in_list_sql_clause
40
37
from synapse .storage .database import (
41
38
DatabasePool ,
@@ -91,16 +88,6 @@ def __init__(
91
88
# at a time. Keyed by room_id.
92
89
self ._joined_host_linearizer = Linearizer ("_JoinedHostsCache" )
93
90
94
- # Is the current_state_events.membership up to date? Or is the
95
- # background update still running?
96
- self ._current_state_events_membership_up_to_date = False
97
-
98
- txn = db_conn .cursor (
99
- txn_name = "_check_safe_current_state_events_membership_updated"
100
- )
101
- self ._check_safe_current_state_events_membership_updated_txn (txn )
102
- txn .close ()
103
-
104
91
if (
105
92
self .hs .config .worker .run_background_tasks
106
93
and self .hs .config .metrics .metrics_flags .known_servers
@@ -157,34 +144,6 @@ def _transact(txn: LoggingTransaction) -> int:
157
144
self ._known_servers_count = max ([count , 1 ])
158
145
return self ._known_servers_count
159
146
160
- def _check_safe_current_state_events_membership_updated_txn (
161
- self , txn : LoggingTransaction
162
- ) -> None :
163
- """Checks if it is safe to assume the new current_state_events
164
- membership column is up to date
165
- """
166
-
167
- pending_update = self .db_pool .simple_select_one_txn (
168
- txn ,
169
- table = "background_updates" ,
170
- keyvalues = {"update_name" : _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME },
171
- retcols = ["update_name" ],
172
- allow_none = True ,
173
- )
174
-
175
- self ._current_state_events_membership_up_to_date = not pending_update
176
-
177
- # If the update is still running, reschedule to run.
178
- if pending_update :
179
- self ._clock .call_later (
180
- 15.0 ,
181
- run_as_background_process ,
182
- "_check_safe_current_state_events_membership_updated" ,
183
- self .db_pool .runInteraction ,
184
- "_check_safe_current_state_events_membership_updated" ,
185
- self ._check_safe_current_state_events_membership_updated_txn ,
186
- )
187
-
188
147
@cached (max_entries = 100000 , iterable = True )
189
148
async def get_users_in_room (self , room_id : str ) -> List [str ]:
190
149
"""
@@ -212,31 +171,14 @@ def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[s
212
171
`get_current_hosts_in_room()` and so we can re-use the cache but it's
213
172
not horrible to have here either.
214
173
"""
215
- # If we can assume current_state_events.membership is up to date
216
- # then we can avoid a join, which is a Very Good Thing given how
217
- # frequently this function gets called.
218
- if self ._current_state_events_membership_up_to_date :
219
- sql = """
220
- SELECT c.state_key FROM current_state_events as c
221
- /* Get the depth of the event from the events table */
222
- INNER JOIN events AS e USING (event_id)
223
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
224
- /* Sorted by lowest depth first */
225
- ORDER BY e.depth ASC;
226
- """
227
- else :
228
- sql = """
229
- SELECT c.state_key FROM room_memberships as m
230
- /* Get the depth of the event from the events table */
231
- INNER JOIN events AS e USING (event_id)
232
- INNER JOIN current_state_events as c
233
- ON m.event_id = c.event_id
234
- AND m.room_id = c.room_id
235
- AND m.user_id = c.state_key
236
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
237
- /* Sorted by lowest depth first */
238
- ORDER BY e.depth ASC;
239
- """
174
+ sql = """
175
+ SELECT c.state_key FROM current_state_events as c
176
+ /* Get the depth of the event from the events table */
177
+ INNER JOIN events AS e USING (event_id)
178
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
179
+ /* Sorted by lowest depth first */
180
+ ORDER BY e.depth ASC;
181
+ """
240
182
241
183
txn .execute (sql , (room_id , Membership .JOIN ))
242
184
return [r [0 ] for r in txn ]
@@ -353,28 +295,14 @@ def _get_room_summary_txn(
353
295
# We do this all in one transaction to keep the cache small.
354
296
# FIXME: get rid of this when we have room_stats
355
297
356
- # If we can assume current_state_events.membership is up to date
357
- # then we can avoid a join, which is a Very Good Thing given how
358
- # frequently this function gets called.
359
- if self ._current_state_events_membership_up_to_date :
360
- # Note, rejected events will have a null membership field, so
361
- # we we manually filter them out.
362
- sql = """
363
- SELECT count(*), membership FROM current_state_events
364
- WHERE type = 'm.room.member' AND room_id = ?
365
- AND membership IS NOT NULL
366
- GROUP BY membership
367
- """
368
- else :
369
- sql = """
370
- SELECT count(*), m.membership FROM room_memberships as m
371
- INNER JOIN current_state_events as c
372
- ON m.event_id = c.event_id
373
- AND m.room_id = c.room_id
374
- AND m.user_id = c.state_key
375
- WHERE c.type = 'm.room.member' AND c.room_id = ?
376
- GROUP BY m.membership
377
- """
298
+ # Note, rejected events will have a null membership field, so
299
+ # we we manually filter them out.
300
+ sql = """
301
+ SELECT count(*), membership FROM current_state_events
302
+ WHERE type = 'm.room.member' AND room_id = ?
303
+ AND membership IS NOT NULL
304
+ GROUP BY membership
305
+ """
378
306
379
307
txn .execute (sql , (room_id ,))
380
308
res : Dict [str , MemberSummary ] = {}
@@ -383,30 +311,18 @@ def _get_room_summary_txn(
383
311
384
312
# we order by membership and then fairly arbitrarily by event_id so
385
313
# heroes are consistent
386
- if self ._current_state_events_membership_up_to_date :
387
- # Note, rejected events will have a null membership field, so
388
- # we we manually filter them out.
389
- sql = """
390
- SELECT state_key, membership, event_id
391
- FROM current_state_events
392
- WHERE type = 'm.room.member' AND room_id = ?
393
- AND membership IS NOT NULL
394
- ORDER BY
395
- CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
396
- event_id ASC
397
- LIMIT ?
398
- """
399
- else :
400
- sql = """
401
- SELECT c.state_key, m.membership, c.event_id
402
- FROM room_memberships as m
403
- INNER JOIN current_state_events as c USING (room_id, event_id)
404
- WHERE c.type = 'm.room.member' AND c.room_id = ?
405
- ORDER BY
406
- CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
407
- c.event_id ASC
408
- LIMIT ?
409
- """
314
+ # Note, rejected events will have a null membership field, so
315
+ # we we manually filter them out.
316
+ sql = """
317
+ SELECT state_key, membership, event_id
318
+ FROM current_state_events
319
+ WHERE type = 'm.room.member' AND room_id = ?
320
+ AND membership IS NOT NULL
321
+ ORDER BY
322
+ CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
323
+ event_id ASC
324
+ LIMIT ?
325
+ """
410
326
411
327
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
412
328
txn .execute (sql , (room_id , Membership .JOIN , Membership .INVITE , 6 ))
@@ -649,27 +565,15 @@ def _get_rooms_for_user_with_stream_ordering_txn(
649
565
# We use `current_state_events` here and not `local_current_membership`
650
566
# as a) this gets called with remote users and b) this only gets called
651
567
# for rooms the server is participating in.
652
- if self ._current_state_events_membership_up_to_date :
653
- sql = """
654
- SELECT room_id, e.instance_name, e.stream_ordering
655
- FROM current_state_events AS c
656
- INNER JOIN events AS e USING (room_id, event_id)
657
- WHERE
658
- c.type = 'm.room.member'
659
- AND c.state_key = ?
660
- AND c.membership = ?
661
- """
662
- else :
663
- sql = """
664
- SELECT room_id, e.instance_name, e.stream_ordering
665
- FROM current_state_events AS c
666
- INNER JOIN room_memberships AS m USING (room_id, event_id)
667
- INNER JOIN events AS e USING (room_id, event_id)
668
- WHERE
669
- c.type = 'm.room.member'
670
- AND c.state_key = ?
671
- AND m.membership = ?
672
- """
568
+ sql = """
569
+ SELECT room_id, e.instance_name, e.stream_ordering
570
+ FROM current_state_events AS c
571
+ INNER JOIN events AS e USING (room_id, event_id)
572
+ WHERE
573
+ c.type = 'm.room.member'
574
+ AND c.state_key = ?
575
+ AND c.membership = ?
576
+ """
673
577
674
578
txn .execute (sql , (user_id , Membership .JOIN ))
675
579
return frozenset (
@@ -707,27 +611,15 @@ def _get_rooms_for_users_with_stream_ordering_txn(
707
611
user_ids ,
708
612
)
709
613
710
- if self ._current_state_events_membership_up_to_date :
711
- sql = f"""
712
- SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
713
- FROM current_state_events AS c
714
- INNER JOIN events AS e USING (room_id, event_id)
715
- WHERE
716
- c.type = 'm.room.member'
717
- AND c.membership = ?
718
- AND { clause }
719
- """
720
- else :
721
- sql = f"""
722
- SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
723
- FROM current_state_events AS c
724
- INNER JOIN room_memberships AS m USING (room_id, event_id)
725
- INNER JOIN events AS e USING (room_id, event_id)
726
- WHERE
727
- c.type = 'm.room.member'
728
- AND m.membership = ?
729
- AND { clause }
730
- """
614
+ sql = f"""
615
+ SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
616
+ FROM current_state_events AS c
617
+ INNER JOIN events AS e USING (room_id, event_id)
618
+ WHERE
619
+ c.type = 'm.room.member'
620
+ AND c.membership = ?
621
+ AND { clause }
622
+ """
731
623
732
624
txn .execute (sql , [Membership .JOIN ] + args )
733
625
0 commit comments