Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8eec25a

Browse files
authored
Track ongoing event fetches correctly in the presence of failure (#11240)
When an event fetcher aborts due to an exception, `_event_fetch_ongoing` must be decremented, otherwise the event fetcher would never be replaced. If enough event fetchers were to fail, no more events would be fetched and requests would get stuck waiting for events.
1 parent a271e23 commit 8eec25a

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

changelog.d/11240.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection.

synapse/storage/databases/main/events_worker.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import attr
3030
from constantly import NamedConstant, Names
31+
from prometheus_client import Gauge
3132
from typing_extensions import Literal
3233

3334
from twisted.internet import defer
@@ -81,6 +82,12 @@
8182
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
8283

8384

85+
event_fetch_ongoing_gauge = Gauge(
86+
"synapse_event_fetch_ongoing",
87+
"The number of event fetchers that are running",
88+
)
89+
90+
8491
@attr.s(slots=True, auto_attribs=True)
8592
class _EventCacheEntry:
8693
event: EventBase
@@ -222,6 +229,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
222229
self._event_fetch_lock = threading.Condition()
223230
self._event_fetch_list = []
224231
self._event_fetch_ongoing = 0
232+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
225233

226234
# We define this sequence here so that it can be referenced from both
227235
# the DataStore and PersistEventStore.
@@ -732,28 +740,31 @@ def _do_fetch(self, conn: Connection) -> None:
732740
"""Takes a database connection and waits for requests for events from
733741
the _event_fetch_list queue.
734742
"""
735-
i = 0
736-
while True:
737-
with self._event_fetch_lock:
738-
event_list = self._event_fetch_list
739-
self._event_fetch_list = []
740-
741-
if not event_list:
742-
single_threaded = self.database_engine.single_threaded
743-
if (
744-
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
745-
or single_threaded
746-
or i > EVENT_QUEUE_ITERATIONS
747-
):
748-
self._event_fetch_ongoing -= 1
749-
return
750-
else:
751-
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
752-
i += 1
753-
continue
754-
i = 0
755-
756-
self._fetch_event_list(conn, event_list)
743+
try:
744+
i = 0
745+
while True:
746+
with self._event_fetch_lock:
747+
event_list = self._event_fetch_list
748+
self._event_fetch_list = []
749+
750+
if not event_list:
751+
single_threaded = self.database_engine.single_threaded
752+
if (
753+
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
754+
or single_threaded
755+
or i > EVENT_QUEUE_ITERATIONS
756+
):
757+
break
758+
else:
759+
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
760+
i += 1
761+
continue
762+
i = 0
763+
764+
self._fetch_event_list(conn, event_list)
765+
finally:
766+
self._event_fetch_ongoing -= 1
767+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
757768

758769
def _fetch_event_list(
759770
self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -977,6 +988,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]:
977988

978989
if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
979990
self._event_fetch_ongoing += 1
991+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
980992
should_start = True
981993
else:
982994
should_start = False

0 commit comments

Comments
 (0)