Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit eaf977e

Browse files
realtyemH-Shay
authored andcommitted
Remove need for worker_main_http_uri setting to use /keys/upload. (#14400)
1 parent 70d00a0 commit eaf977e

File tree

7 files changed

+130
-127
lines changed

7 files changed

+130
-127
lines changed

changelog.d/14400.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.

docker/configure_workers_and_start.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,7 @@
213213
"listener_resources": ["client", "replication"],
214214
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
215215
"shared_extra_conf": {},
216-
"worker_extra_conf": (
217-
"worker_main_http_uri: http://127.0.0.1:%d"
218-
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
219-
),
216+
"worker_extra_conf": "",
220217
},
221218
"account_data": {
222219
"app": "synapse.app.generic_worker",

docs/workers.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ In the config file for each worker, you must specify:
135135
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
136136
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
137137
with an `http` listener.
138-
* If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
139-
the main process (`worker_main_http_uri`).
138+
* **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
139+
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer.
140140

141141
For example:
142142

@@ -221,7 +221,6 @@ information.
221221
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
222222

223223
# Encryption requests
224-
# Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri`
225224
^/_matrix/client/(r0|v3|unstable)/keys/query$
226225
^/_matrix/client/(r0|v3|unstable)/keys/changes$
227226
^/_matrix/client/(r0|v3|unstable)/keys/claim$
@@ -376,7 +375,7 @@ responsible for
376375
- persisting them to the DB, and finally
377376
- updating the events stream.
378377

379-
Because load is sharded in this way, you *must* restart all worker instances when
378+
Because load is sharded in this way, you *must* restart all worker instances when
380379
adding or removing event persisters.
381380

382381
An `event_persister` should not be mistaken for an `event_creator`.

synapse/app/generic_worker.py

Lines changed: 2 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@
1414
# limitations under the License.
1515
import logging
1616
import sys
17-
from typing import Dict, List, Optional, Tuple
17+
from typing import Dict, List
1818

19-
from twisted.internet import address
2019
from twisted.web.resource import Resource
2120

2221
import synapse
2322
import synapse.events
24-
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
2523
from synapse.api.urls import (
2624
CLIENT_API_PREFIX,
2725
FEDERATION_PREFIX,
@@ -43,8 +41,6 @@
4341
from synapse.config.server import ListenerConfig
4442
from synapse.federation.transport.server import TransportLayerServer
4543
from synapse.http.server import JsonResource, OptionsResource
46-
from synapse.http.servlet import RestServlet, parse_json_object_from_request
47-
from synapse.http.site import SynapseRequest
4844
from synapse.logging.context import LoggingContext
4945
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
5046
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@@ -70,12 +66,12 @@
7066
versions,
7167
voip,
7268
)
73-
from synapse.rest.client._base import client_patterns
7469
from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
7570
from synapse.rest.client.devices import DevicesRestServlet
7671
from synapse.rest.client.keys import (
7772
KeyChangesServlet,
7873
KeyQueryServlet,
74+
KeyUploadServlet,
7975
OneTimeKeyServlet,
8076
)
8177
from synapse.rest.client.register import (
@@ -132,107 +128,12 @@
132128
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
133129
from synapse.storage.databases.main.user_directory import UserDirectoryStore
134130
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
135-
from synapse.types import JsonDict
136131
from synapse.util import SYNAPSE_VERSION
137132
from synapse.util.httpresourcetree import create_resource_tree
138133

139134
logger = logging.getLogger("synapse.app.generic_worker")
140135

141136

142-
class KeyUploadServlet(RestServlet):
143-
"""An implementation of the `KeyUploadServlet` that responds to read only
144-
requests, but otherwise proxies through to the master instance.
145-
"""
146-
147-
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
148-
149-
def __init__(self, hs: HomeServer):
150-
"""
151-
Args:
152-
hs: server
153-
"""
154-
super().__init__()
155-
self.auth = hs.get_auth()
156-
self.store = hs.get_datastores().main
157-
self.http_client = hs.get_simple_http_client()
158-
self.main_uri = hs.config.worker.worker_main_http_uri
159-
160-
async def on_POST(
161-
self, request: SynapseRequest, device_id: Optional[str]
162-
) -> Tuple[int, JsonDict]:
163-
requester = await self.auth.get_user_by_req(request, allow_guest=True)
164-
user_id = requester.user.to_string()
165-
body = parse_json_object_from_request(request)
166-
167-
if device_id is not None:
168-
# passing the device_id here is deprecated; however, we allow it
169-
# for now for compatibility with older clients.
170-
if requester.device_id is not None and device_id != requester.device_id:
171-
logger.warning(
172-
"Client uploading keys for a different device "
173-
"(logged in as %s, uploading for %s)",
174-
requester.device_id,
175-
device_id,
176-
)
177-
else:
178-
device_id = requester.device_id
179-
180-
if device_id is None:
181-
raise SynapseError(
182-
400, "To upload keys, you must pass device_id when authenticating"
183-
)
184-
185-
if body:
186-
# They're actually trying to upload something, proxy to main synapse.
187-
188-
# Proxy headers from the original request, such as the auth headers
189-
# (in case the access token is there) and the original IP /
190-
# User-Agent of the request.
191-
headers: Dict[bytes, List[bytes]] = {
192-
header: list(request.requestHeaders.getRawHeaders(header, []))
193-
for header in (b"Authorization", b"User-Agent")
194-
}
195-
# Add the previous hop to the X-Forwarded-For header.
196-
x_forwarded_for = list(
197-
request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
198-
)
199-
# we use request.client here, since we want the previous hop, not the
200-
# original client (as returned by request.getClientAddress()).
201-
if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
202-
previous_host = request.client.host.encode("ascii")
203-
# If the header exists, add to the comma-separated list of the first
204-
# instance of the header. Otherwise, generate a new header.
205-
if x_forwarded_for:
206-
x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
207-
x_forwarded_for.extend(x_forwarded_for[1:])
208-
else:
209-
x_forwarded_for = [previous_host]
210-
headers[b"X-Forwarded-For"] = x_forwarded_for
211-
212-
# Replicate the original X-Forwarded-Proto header. Note that
213-
# XForwardedForRequest overrides isSecure() to give us the original protocol
214-
# used by the client, as opposed to the protocol used by our upstream proxy
215-
# - which is what we want here.
216-
headers[b"X-Forwarded-Proto"] = [
217-
b"https" if request.isSecure() else b"http"
218-
]
219-
220-
try:
221-
result = await self.http_client.post_json_get_json(
222-
self.main_uri + request.uri.decode("ascii"), body, headers=headers
223-
)
224-
except HttpResponseException as e:
225-
raise e.to_synapse_error() from e
226-
except RequestSendFailed as e:
227-
raise SynapseError(502, "Failed to talk to master") from e
228-
229-
return 200, result
230-
else:
231-
# Just interested in counts.
232-
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
233-
return 200, {"one_time_key_counts": result}
234-
235-
236137
class GenericWorkerSlavedStore(
237138
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
238139
# rather than going via the correct worker.

synapse/config/workers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
162162
self.worker_name = config.get("worker_name", self.worker_app)
163163
self.instance_name = self.worker_name or "master"
164164

165+
# FIXME: Remove this check after a suitable amount of time.
165166
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
167+
if self.worker_main_http_uri is not None:
168+
logger.warning(
169+
"The config option worker_main_http_uri is unused since Synapse 1.72. "
170+
"It can be safely removed from your configuration."
171+
)
166172

167173
# This option is really only here to support `--manhole` command line
168174
# argument.

synapse/replication/http/devices.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from twisted.web.server import Request
1919

2020
from synapse.http.server import HttpServer
21+
from synapse.http.servlet import parse_json_object_from_request
2122
from synapse.replication.http._base import ReplicationEndpoint
2223
from synapse.types import JsonDict
2324

@@ -78,5 +79,71 @@ async def _handle_request( # type: ignore[override]
7879
return 200, user_devices
7980

8081

82+
class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
83+
"""Ask master to upload keys for the user and send them out over federation to
84+
update other servers.
85+
86+
For now, only the master is permitted to handle key upload requests;
87+
any worker can handle key query requests (since they're read-only).
88+
89+
Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
90+
the main process to accomplish this.
91+
92+
Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
93+
Request format(borrowed and expanded from KeyUploadServlet):
94+
95+
POST /_synapse/replication/upload_keys_for_user
96+
97+
{
98+
"user_id": "<user_id>",
99+
"device_id": "<device_id>",
100+
"keys": {
101+
....this part can be found in KeyUploadServlet in rest/client/keys.py....
102+
}
103+
}
104+
105+
Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet
106+
107+
"""
108+
109+
NAME = "upload_keys_for_user"
110+
PATH_ARGS = ()
111+
CACHE = False
112+
113+
def __init__(self, hs: "HomeServer"):
114+
super().__init__(hs)
115+
116+
self.e2e_keys_handler = hs.get_e2e_keys_handler()
117+
self.store = hs.get_datastores().main
118+
self.clock = hs.get_clock()
119+
120+
@staticmethod
121+
async def _serialize_payload( # type: ignore[override]
122+
user_id: str, device_id: str, keys: JsonDict
123+
) -> JsonDict:
124+
125+
return {
126+
"user_id": user_id,
127+
"device_id": device_id,
128+
"keys": keys,
129+
}
130+
131+
async def _handle_request( # type: ignore[override]
132+
self, request: Request
133+
) -> Tuple[int, JsonDict]:
134+
content = parse_json_object_from_request(request)
135+
136+
user_id = content["user_id"]
137+
device_id = content["device_id"]
138+
keys = content["keys"]
139+
140+
results = await self.e2e_keys_handler.upload_keys_for_user(
141+
user_id, device_id, keys
142+
)
143+
144+
return 200, results
145+
146+
81147
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
82148
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
149+
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)

synapse/rest/client/keys.py

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from synapse.http.site import SynapseRequest
2929
from synapse.logging.opentracing import log_kv, set_tag
30+
from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
3031
from synapse.rest.client._base import client_patterns, interactive_auth_handler
3132
from synapse.types import JsonDict, StreamToken
3233
from synapse.util.cancellation import cancellable
@@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet):
4344
Content-Type: application/json
4445
4546
{
46-
"device_keys": {
47-
"user_id": "<user_id>",
48-
"device_id": "<device_id>",
49-
"valid_until_ts": <millisecond_timestamp>,
50-
"algorithms": [
51-
"m.olm.curve25519-aes-sha2",
52-
]
53-
"keys": {
54-
"<algorithm>:<device_id>": "<key_base64>",
47+
"device_keys": {
48+
"user_id": "<user_id>",
49+
"device_id": "<device_id>",
50+
"valid_until_ts": <millisecond_timestamp>,
51+
"algorithms": [
52+
"m.olm.curve25519-aes-sha2",
53+
]
54+
"keys": {
55+
"<algorithm>:<device_id>": "<key_base64>",
56+
},
57+
"signatures:" {
58+
"<user_id>" {
59+
"<algorithm>:<device_id>": "<signature_base64>"
60+
}
61+
}
62+
},
63+
"fallback_keys": {
64+
"<algorithm>:<device_id>": "<key_base64>",
65+
"signed_<algorithm>:<device_id>": {
66+
"fallback": true,
67+
"key": "<key_base64>",
68+
"signatures": {
69+
"<user_id>": {
70+
"<algorithm>:<device_id>": "<key_base64>"
71+
}
72+
}
73+
}
74+
}
75+
"one_time_keys": {
76+
"<algorithm>:<key_id>": "<key_base64>"
5577
},
56-
"signatures:" {
57-
"<user_id>" {
58-
"<algorithm>:<device_id>": "<signature_base64>"
59-
} } },
60-
"one_time_keys": {
61-
"<algorithm>:<key_id>": "<key_base64>"
62-
},
6378
}
79+
80+
response, e.g.:
81+
82+
{
83+
"one_time_key_counts": {
84+
"curve25519": 10,
85+
"signed_curve25519": 20
86+
}
87+
}
88+
6489
"""
6590

6691
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -71,6 +96,13 @@ def __init__(self, hs: "HomeServer"):
7196
self.e2e_keys_handler = hs.get_e2e_keys_handler()
7297
self.device_handler = hs.get_device_handler()
7398

99+
if hs.config.worker.worker_app is None:
100+
# if main process
101+
self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
102+
else:
103+
# then a worker
104+
self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
105+
74106
async def on_POST(
75107
self, request: SynapseRequest, device_id: Optional[str]
76108
) -> Tuple[int, JsonDict]:
@@ -109,8 +141,8 @@ async def on_POST(
109141
400, "To upload keys, you must pass device_id when authenticating"
110142
)
111143

112-
result = await self.e2e_keys_handler.upload_keys_for_user(
113-
user_id, device_id, body
144+
result = await self.key_uploader(
145+
user_id=user_id, device_id=device_id, keys=body
114146
)
115147
return 200, result
116148

0 commit comments

Comments
 (0)