Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up chain cover calculation #9176

Merged
merged 6 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Batch up calls to DB for allocating new chain IDs
  • Loading branch information
erikjohnston committed Jan 20, 2021
commit 117953dad5ed206826bfb7c0f321f635a9b7d567
50 changes: 39 additions & 11 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ def _add_chain_cover_index(
events_to_calc_chain_id_for,
chain_map,
)
chain_map.update(new_chain_tuples)

db_pool.simple_insert_many_txn(
txn,
Expand Down Expand Up @@ -792,8 +793,6 @@ def _allocate_chain_ids(
# new chain if the sequence number has already been allocated.
#

new_chain_tuples = {} # type: Dict[str, Tuple[int, int]]

existing_chains = set() # type: Set[int]
tree = [] # type: List[Tuple[str, Optional[str]]]

Expand Down Expand Up @@ -826,14 +825,28 @@ def _allocate_chain_ids(
)
txn.execute(sql % (clause,), args)

chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[int, int]
chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int]

# Allocate the new events chain ID/sequence numbers.
for event_id, prev_event_id in tree:
if prev_event_id:
existing_chain_id = chain_map[prev_event_id]

new_chain_tuple = None
#
# To reduce the number of calls to the database we don't allocate a
# chain ID number in the loop, instead we use a temporary `object()` for
# each new chain ID. Once we've done the loop we generate the necessary
# number of new chain IDs in one call, replacing all temporary
# objects with real allocated chain IDs.

unallocated_chain_ids = set() # type: Set[object]
new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]]
for event_id, auth_event_id in tree:
# If we reference an auth_event_id we fetch the allocated chain ID,
# either from the existing `chain_map` or the newly generated
# `new_chain_tuples` map.
if auth_event_id:
existing_chain_id = new_chain_tuples.get(auth_event_id)
if not existing_chain_id:
existing_chain_id = chain_map[auth_event_id]

new_chain_tuple = None # type: Optional[Tuple[Any, int]]
if existing_chain_id:
# We found a chain ID/sequence number candidate, check its
# not already taken.
Expand All @@ -846,14 +859,29 @@ def _allocate_chain_ids(
proposed_new_seq,
)

# If we need to start a new chain we allocate a temporary chain ID.
if not new_chain_tuple:
new_chain_tuple = (db_pool.event_chain_id_gen.get_next_id_txn(txn), 1)
new_chain_tuple = (object(), 1)
unallocated_chain_ids.add(new_chain_tuple[0])

chain_map[event_id] = new_chain_tuple
new_chain_tuples[event_id] = new_chain_tuple
chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]

return new_chain_tuples
# Generate new chain IDs for all unallocated chain IDs.
newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
txn, len(unallocated_chain_ids)
)

# Map from potentially temporary chain ID to real chain ID
chain_id_to_allocated_map = dict(
zip(unallocated_chain_ids, newly_allocated_chain_ids)
) # type: Dict[Any, int]
chain_id_to_allocated_map.update((c, c) for c in existing_chains)

return {
event_id: (chain_id_to_allocated_map[chain_id], seq)
for event_id, (chain_id, seq) in new_chain_tuples.items()
}

def _persist_transaction_ids_txn(
self,
Expand Down
16 changes: 16 additions & 0 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def get_next_id_txn(self, txn: Cursor) -> int:
"""Gets the next ID in the sequence"""
...

@abc.abstractmethod
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
"""Get the next `n` IDs in the sequence"""
...

@abc.abstractmethod
def check_consistency(
self,
Expand Down Expand Up @@ -174,6 +179,17 @@ def get_next_id_txn(self, txn: Cursor) -> int:
self._current_max_id += 1
return self._current_max_id

def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None

first_id = self._current_max_id + 1
self._current_max_id += n
return [first_id + i for i in range(n)]

def check_consistency(
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
):
Expand Down