4545See docs/postgres.md for more information.
4646"""
4747
48+ _INCONSISTENT_STREAM_ERROR = """
49+ Postgres sequence '%(seq)s' is inconsistent with associated stream position
50+ of '%(stream_name)s' in the 'stream_positions' table.
51+
52+ This is likely a programming error and should be reported at
53+ https://github.com/matrix-org/synapse.
54+
55+ A temporary workaround to fix this error is to shut down Synapse (including
56+ any and all workers) and run the following SQL:
57+
58+ DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
59+
60+ This will need to be done every time the server is restarted.
61+ """
62+
4863
4964class SequenceGenerator (metaclass = abc .ABCMeta ):
5065 """A class which generates a unique sequence of integers"""
@@ -60,14 +75,20 @@ def check_consistency(
6075 db_conn : "LoggingDatabaseConnection" ,
6176 table : str ,
6277 id_column : str ,
78+ stream_name : Optional [str ] = None ,
6379 positive : bool = True ,
6480 ):
6581 """Should be called during start up to test that the current value of
6682 the sequence is greater than or equal to the maximum ID in the table.
6783
68- This is to handle various cases where the sequence value can get out
69- of sync with the table, e.g. if Synapse gets rolled back to a previous
84+ This is to handle various cases where the sequence value can get out of
85+ sync with the table, e.g. if Synapse gets rolled back to a previous
7086 version and the rolled forwards again.
87+
88+ If a stream name is given then this will check that any value in the
89+ `stream_positions` table is less than or equal to the current sequence
90+ value. If it isn't then it's likely that streams have been crossed
91+ somewhere (e.g. two ID generators have the same stream name).
7192 """
7293 ...
7394
@@ -93,8 +114,12 @@ def check_consistency(
93114 db_conn : "LoggingDatabaseConnection" ,
94115 table : str ,
95116 id_column : str ,
117+ stream_name : Optional [str ] = None ,
96118 positive : bool = True ,
97119 ):
120+ """See SequenceGenerator.check_consistency for docstring.
121+ """
122+
98123 txn = db_conn .cursor (txn_name = "sequence.check_consistency" )
99124
100125 # First we get the current max ID from the table.
@@ -118,6 +143,18 @@ def check_consistency(
118143 "SELECT last_value, is_called FROM %(seq)s" % {"seq" : self ._sequence_name }
119144 )
120145 last_value , is_called = txn .fetchone ()
146+
147+ # If we have an associated stream check the stream_positions table.
148+ max_in_stream_positions = None
149+ if stream_name :
150+ txn .execute (
151+ "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?" ,
152+ (stream_name ,),
153+ )
154+ row = txn .fetchone ()
155+ if row :
156+ max_in_stream_positions = row [0 ]
157+
121158 txn .close ()
122159
123160 # If `is_called` is False then `last_value` is actually the value that
@@ -138,6 +175,14 @@ def check_consistency(
138175 % {"seq" : self ._sequence_name , "table" : table , "max_id_sql" : table_sql }
139176 )
140177
178+ # If we have values in the stream positions table then they have to be
179+ # less than or equal to `last_value`
180+ if max_in_stream_positions and max_in_stream_positions > last_value :
181+ raise IncorrectDatabaseSetup (
182+ _INCONSISTENT_STREAM_ERROR
183+ % {"seq" : self ._sequence_name , "stream" : stream_name }
184+ )
185+
141186
142187GetFirstCallbackType = Callable [[Cursor ], int ]
143188
@@ -175,7 +220,12 @@ def get_next_id_txn(self, txn: Cursor) -> int:
175220 return self ._current_max_id
176221
177222 def check_consistency (
178- self , db_conn : Connection , table : str , id_column : str , positive : bool = True
223+ self ,
224+ db_conn : Connection ,
225+ table : str ,
226+ id_column : str ,
227+ stream_name : Optional [str ] = None ,
228+ positive : bool = True ,
179229 ):
180230 # There is nothing to do for in memory sequences
181231 pass
0 commit comments