Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix module API's get_user_ip_and_agents function when run on workers #11112

Merged
merged 5 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11112.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which caused the module API's `get_user_ip_and_agents` function to always fail on workers. `get_user_ip_and_agents` was introduced in 1.44.0 and did not function correctly on worker processes at the time.
6 changes: 5 additions & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
Expand All @@ -61,6 +62,7 @@
from synapse.util.caches.descriptors import cached

if TYPE_CHECKING:
from synapse.app.generic_worker import GenericWorkerSlavedStore
from synapse.server import HomeServer

"""
Expand Down Expand Up @@ -111,7 +113,9 @@ class ModuleApi:
def __init__(self, hs: "HomeServer", auth_handler):
self._hs = hs

self._store = hs.get_datastore()
# TODO: Fix this type hint once the types for the data stores have been ironed
# out.
self._store: Union[DataStore, "GenericWorkerSlavedStore"] = hs.get_datastore()
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
Expand Down
124 changes: 85 additions & 39 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,58 @@ async def get_last_client_ip_by_device(

return {(d["user_id"], d["device_id"]): d for d in res}

async def get_user_ip_and_agents(
self, user: UserID, since_ts: int = 0
) -> List[LastConnectionInfo]:
"""Fetch the IPs and user agents for a user since the given timestamp.

The result might be slightly out of date as client IPs are inserted in batches.

Args:
user: The user for which to fetch IP addresses and user agents.
since_ts: The timestamp after which to fetch IP addresses and user agents,
in milliseconds.

Returns:
A list of dictionaries, each containing:
* `access_token`: The access token used.
* `ip`: The IP address used.
* `user_agent`: The last user agent seen for this access token and IP
address combination.
* `last_seen`: The timestamp at which this access token and IP address
combination was last seen, in milliseconds.

Only the latest user agent for each access token and IP address combination
is available.
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
user_id = user.to_string()

def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]:
txn.execute(
"""
SELECT access_token, ip, user_agent, last_seen FROM user_ips
WHERE last_seen >= ? AND user_id = ?
ORDER BY last_seen
DESC
""",
(since_ts, user_id),
)
return cast(List[Tuple[str, str, str, int]], txn.fetchall())

rows = await self.db_pool.runInteraction(
desc="get_user_ip_and_agents", func=get_recent
)

return [
{
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"last_seen": last_seen,
}
for access_token, ip, user_agent, last_seen in rows
]


class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
Expand Down Expand Up @@ -622,49 +674,43 @@ async def get_last_client_ip_by_device(
async def get_user_ip_and_agents(
self, user: UserID, since_ts: int = 0
) -> List[LastConnectionInfo]:
"""Fetch the IPs and user agents for a user since the given timestamp.

Args:
user: The user for which to fetch IP addresses and user agents.
since_ts: The timestamp after which to fetch IP addresses and user agents,
in milliseconds.

Returns:
A list of dictionaries, each containing:
* `access_token`: The access token used.
* `ip`: The IP address used.
* `user_agent`: The last user agent seen for this access token and IP
address combination.
* `last_seen`: The timestamp at which this access token and IP address
combination was last seen, in milliseconds.

Only the latest user agent for each access token and IP address combination
is available.
"""
Fetch IP/User Agent connection since a given timestamp.
"""
user_id = user.to_string()
results: Dict[Tuple[str, str], Tuple[str, int]] = {}
results: Dict[Tuple[str, str], LastConnectionInfo] = {
(connection["access_token"], connection["ip"]): connection
for connection in await super().get_user_ip_and_agents(user, since_ts)
}
Comment on lines +696 to +699
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only downside of this approach (that I can see) is that we could pull data which we don't care about, this doesn't seem worse than what we were doing before though, so I think that's OK!


# Overlay data that is pending insertion on top of the results from the
# database.
user_id = user.to_string()
for key in self._batch_row_update:
(
uid,
access_token,
ip,
) = key
uid, access_token, ip = key
if uid == user_id:
user_agent, _, last_seen = self._batch_row_update[key]
if last_seen >= since_ts:
results[(access_token, ip)] = (user_agent, last_seen)

def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]:
txn.execute(
"""
SELECT access_token, ip, user_agent, last_seen FROM user_ips
WHERE last_seen >= ? AND user_id = ?
ORDER BY last_seen
DESC
""",
(since_ts, user_id),
)
return cast(List[Tuple[str, str, str, int]], txn.fetchall())

rows = await self.db_pool.runInteraction(
desc="get_user_ip_and_agents", func=get_recent
)
results[(access_token, ip)] = {
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"last_seen": last_seen,
}

results.update(
((access_token, ip), (user_agent, last_seen))
for access_token, ip, user_agent, last_seen in rows
)
Comment on lines -658 to -661
Copy link
Member

@clokep clokep Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were we previously stomping over more recent data (from memory) with data from the database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be worth adding a test for!

return [
{
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"last_seen": last_seen,
}
for (access_token, ip), (user_agent, last_seen) in results.items()
]
return list(results.values())