|
24 | 24 | from twisted.internet import defer
|
25 | 25 |
|
26 | 26 | from synapse.api.constants import EventTypes, Membership
|
| 27 | +from synapse.metrics import LaterGauge |
27 | 28 | from synapse.metrics.background_process_metrics import run_as_background_process
|
28 | 29 | from synapse.storage._base import LoggingTransaction
|
| 30 | +from synapse.storage.engines import Sqlite3Engine |
29 | 31 | from synapse.storage.events_worker import EventsWorkerStore
|
30 | 32 | from synapse.types import get_domain_from_id
|
31 | 33 | from synapse.util.async_helpers import Linearizer
|
@@ -74,6 +76,63 @@ def __init__(self, db_conn, hs):
|
74 | 76 | self._check_safe_current_state_events_membership_updated_txn(txn)
|
75 | 77 | txn.close()
|
76 | 78 |
|
| 79 | + if self.hs.config.metrics_flags.known_servers: |
| 80 | + self._known_servers_count = 1 |
| 81 | + self.hs.get_clock().looping_call( |
| 82 | + run_as_background_process, |
| 83 | + 60 * 1000, |
| 84 | + "_count_known_servers", |
| 85 | + self._count_known_servers, |
| 86 | + ) |
| 87 | + self.hs.get_clock().call_later( |
| 88 | + 1000, |
| 89 | + run_as_background_process, |
| 90 | + "_count_known_servers", |
| 91 | + self._count_known_servers, |
| 92 | + ) |
| 93 | + LaterGauge( |
| 94 | + "synapse_federation_known_servers", |
| 95 | + "", |
| 96 | + [], |
| 97 | + lambda: self._known_servers_count, |
| 98 | + ) |
| 99 | + |
| 100 | + @defer.inlineCallbacks |
| 101 | + def _count_known_servers(self): |
| 102 | + """ |
| 103 | + Count the servers that this server knows about. |
| 104 | +
|
| 105 | + The statistic is stored on the class for the |
| 106 | + `synapse_federation_known_servers` LaterGauge to collect. |
| 107 | + """ |
| 108 | + |
| 109 | + def _transact(txn): |
| 110 | + if isinstance(self.database_engine, Sqlite3Engine): |
| 111 | + query = """ |
| 112 | + SELECT COUNT(DISTINCT substr(out.user_id, pos+1)) |
| 113 | + FROM ( |
| 114 | + SELECT rm.user_id as user_id, instr(rm.user_id, ':') |
| 115 | + AS pos FROM room_memberships as rm |
| 116 | + INNER JOIN current_state_events as c ON rm.event_id = c.event_id |
| 117 | + WHERE c.type = 'm.room.member' |
| 118 | + ) as out |
| 119 | + """ |
| 120 | + else: |
| 121 | + query = """ |
| 122 | + SELECT COUNT(DISTINCT split_part(state_key, ':', 2)) |
| 123 | + FROM current_state_events |
| 124 | + WHERE type = 'm.room.member' AND membership = 'join'; |
| 125 | + """ |
| 126 | + txn.execute(query) |
| 127 | + return list(txn)[0][0] |
| 128 | + |
| 129 | + count = yield self.runInteraction("get_known_servers", _transact) |
| 130 | + |
| 131 | + # We always know about ourselves, even if we have nothing in |
| 132 | + # room_memberships (for example, the server is new). |
| 133 | + self._known_servers_count = max([count, 1]) |
| 134 | + return self._known_servers_count |
| 135 | + |
77 | 136 | def _check_safe_current_state_events_membership_updated_txn(self, txn):
|
78 | 137 | """Checks if it is safe to assume the new current_state_events
|
79 | 138 | membership column is up to date
|
|
0 commit comments