Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bug in account data replication stream. #7656

Merged
merged 4 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7656.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in account data replication stream.
8 changes: 7 additions & 1 deletion synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
Expand Down
10 changes: 8 additions & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,14 @@ async def _update_function(
for stream_id, user_id, room_id, account_data_type in room_results
)

# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(room_rows, global_rows))
# We need to return a sorted list, so merge them together.
#
# Note: We order only by the stream ID to work around a bug where the
# same stream ID could appear in both `global_rows` and `room_rows`,
# leading to a comparison between the data tuples. The comparison could
# fail due to attempting to compare the `room_id` which results in a
# `TypeError` from comparing a `str` vs `None`.
updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
return updates, to_token, limited


Expand Down
16 changes: 15 additions & 1 deletion synapse/storage/data_stores/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data_max_stream_id",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(AccountDataStore, self).__init__(database, db_conn, hs)
Expand Down Expand Up @@ -387,6 +393,10 @@ def add_account_data_for_user(self, user_id, account_data_type, content):
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
#
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
yield self._update_max_stream_id(next_id)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
Expand All @@ -405,6 +415,10 @@ def _update_max_stream_id(self, next_id):
next_id(int): The the revision to advance to.
"""

# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.

def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/data_stores/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id):
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)

# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# schema files, so the users will be informed on server restarts.
# XXX: If you're about to bump this to 59 (or higher) please create an update
# that drops the unused `cache_invalidation_stream` table, as per #7436!
# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
SCHEMA_VERSION = 58

dir_path = os.path.abspath(os.path.dirname(__file__))
Expand Down