|
29 | 29 | )
|
30 | 30 | from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
|
31 | 31 |
|
32 |
| -_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ |
33 |
| -The send_federation config option must be disabled in the main |
34 |
| -synapse process before they can be run in a separate worker. |
35 |
| -
|
36 |
| -Please add ``send_federation: false`` to the main config |
37 |
| -""" |
38 |
| - |
39 |
| -_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ |
40 |
| -The start_pushers config option must be disabled in the main |
41 |
| -synapse process before they can be run in a separate worker. |
42 |
| -
|
43 |
| -Please add ``start_pushers: false`` to the main config |
44 |
| -""" |
45 |
| - |
46 | 32 | _DEPRECATED_WORKER_DUTY_OPTION_USED = """
|
47 | 33 | The '%s' configuration option is deprecated and will be removed in a future
|
48 | 34 | Synapse version. Please use ``%s: name_of_worker`` instead.
|
@@ -182,40 +168,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
182 | 168 | )
|
183 | 169 | )
|
184 | 170 |
|
185 |
| - # Handle federation sender configuration. |
186 |
| - # |
187 |
| - # There are two ways of configuring which instances handle federation |
188 |
| - # sending: |
189 |
| - # 1. The old way where "send_federation" is set to false and running a |
190 |
| - # `synapse.app.federation_sender` worker app. |
191 |
| - # 2. Specifying the workers sending federation in |
192 |
| - # `federation_sender_instances`. |
193 |
| - # |
194 |
| - |
195 |
| - send_federation = config.get("send_federation", True) |
196 |
| - |
197 |
| - federation_sender_instances = config.get("federation_sender_instances") |
198 |
| - if federation_sender_instances is None: |
199 |
| - # Default to an empty list, which means "another, unknown, worker is |
200 |
| - # responsible for it". |
201 |
| - federation_sender_instances = [] |
202 |
| - |
203 |
| - # If no federation sender instances are set we check if |
204 |
| - # `send_federation` is set, which means use master |
205 |
| - if send_federation: |
206 |
| - federation_sender_instances = ["master"] |
207 |
| - |
208 |
| - if self.worker_app == "synapse.app.federation_sender": |
209 |
| - if send_federation: |
210 |
| - # If we're running federation senders, and not using |
211 |
| - # `federation_sender_instances`, then we should have |
212 |
| - # explicitly set `send_federation` to false. |
213 |
| - raise ConfigError( |
214 |
| - _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR |
215 |
| - ) |
216 |
| - |
217 |
| - federation_sender_instances = [self.worker_name] |
218 |
| - |
| 171 | + federation_sender_instances = self._worker_names_performing_this_duty( |
| 172 | + config, |
| 173 | + "send_federation", |
| 174 | + "synapse.app.federation_sender", |
| 175 | + "federation_sender_instances", |
| 176 | + ) |
219 | 177 | self.send_federation = self.instance_name in federation_sender_instances
|
220 | 178 | self.federation_shard_config = ShardedWorkerHandlingConfig(
|
221 | 179 | federation_sender_instances
|
@@ -282,27 +240,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
282 | 240 | )
|
283 | 241 |
|
284 | 242 | # Handle sharded push
|
285 |
| - start_pushers = config.get("start_pushers", True) |
286 |
| - pusher_instances = config.get("pusher_instances") |
287 |
| - if pusher_instances is None: |
288 |
| - # Default to an empty list, which means "another, unknown, worker is |
289 |
| - # responsible for it". |
290 |
| - pusher_instances = [] |
291 |
| - |
292 |
| - # If no pushers instances are set we check if `start_pushers` is |
293 |
| - # set, which means use master |
294 |
| - if start_pushers: |
295 |
| - pusher_instances = ["master"] |
296 |
| - |
297 |
| - if self.worker_app == "synapse.app.pusher": |
298 |
| - if start_pushers: |
299 |
| - # If we're running pushers, and not using |
300 |
| - # `pusher_instances`, then we should have explicitly set |
301 |
| - # `start_pushers` to false. |
302 |
| - raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) |
303 |
| - |
304 |
| - pusher_instances = [self.instance_name] |
305 |
| - |
| 243 | + pusher_instances = self._worker_names_performing_this_duty( |
| 244 | + config, |
| 245 | + "start_pushers", |
| 246 | + "synapse.app.pusher", |
| 247 | + "pusher_instances", |
| 248 | + ) |
306 | 249 | self.start_pushers = self.instance_name in pusher_instances
|
307 | 250 | self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
|
308 | 251 |
|
@@ -425,6 +368,64 @@ def _should_this_worker_perform_duty(
|
425 | 368 | # (By this point, these are either the same value or only one is not None.)
|
426 | 369 | return bool(new_option_should_run_here or legacy_option_should_run_here)
|
427 | 370 |
|
| 371 | + def _worker_names_performing_this_duty( |
| 372 | + self, |
| 373 | + config: Dict[str, Any], |
| 374 | + legacy_option_name: str, |
| 375 | + legacy_app_name: str, |
| 376 | + modern_instance_list_name: str, |
| 377 | + ) -> List[str]: |
| 378 | + """ |
| 379 | + Retrieves the names of the workers handling a given duty, by either legacy |
| 380 | + option or instance list. |
| 381 | +
|
| 382 | + There are two ways of configuring which instances handle a given duty, e.g. |
| 383 | + for configuring pushers: |
| 384 | +
|
| 385 | + 1. The old way where "start_pushers" is set to false and running a |
| 386 | + `synapse.app.pusher'` worker app. |
| 387 | + 2. Specifying the workers sending federation in `pusher_instances`. |
| 388 | +
|
| 389 | + Args: |
| 390 | + config: settings read from yaml. |
| 391 | + legacy_option_name: the old way of enabling options. e.g. 'start_pushers' |
| 392 | + legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' |
| 393 | + modern_instance_list_name: the string name of the new instance_list. e.g. |
| 394 | + 'pusher_instances' |
| 395 | +
|
| 396 | + Returns: |
| 397 | + A list of worker instance names handling the given duty. |
| 398 | + """ |
| 399 | + |
| 400 | + legacy_option = config.get(legacy_option_name, True) |
| 401 | + |
| 402 | + worker_instances = config.get(modern_instance_list_name) |
| 403 | + if worker_instances is None: |
| 404 | + # Default to an empty list, which means "another, unknown, worker is |
| 405 | + # responsible for it". |
| 406 | + worker_instances = [] |
| 407 | + |
| 408 | + # If no worker instances are set we check if the legacy option |
| 409 | + # is set, which means use the main process. |
| 410 | + if legacy_option: |
| 411 | + worker_instances = ["master"] |
| 412 | + |
| 413 | + if self.worker_app == legacy_app_name: |
| 414 | + if legacy_option: |
| 415 | + # If we're using `legacy_app_name`, and not using |
| 416 | + # `modern_instance_list_name`, then we should have |
| 417 | + # explicitly set `legacy_option_name` to false. |
| 418 | + raise ConfigError( |
| 419 | + f"The '{legacy_option_name}' config option must be disabled in " |
| 420 | + "the main synapse process before they can be run in a separate " |
| 421 | + "worker.\n" |
| 422 | + f"Please add `{legacy_option_name}: false` to the main config.\n", |
| 423 | + ) |
| 424 | + |
| 425 | + worker_instances = [self.worker_name] |
| 426 | + |
| 427 | + return worker_instances |
| 428 | + |
428 | 429 | def read_arguments(self, args: argparse.Namespace) -> None:
|
429 | 430 | # We support a bunch of command line arguments that override options in
|
430 | 431 | # the config. A lot of these options have a worker_* prefix when running
|
|
0 commit comments