|
23 | 23 | from typing import ( |
24 | 24 | TYPE_CHECKING, |
25 | 25 | Any, |
| 26 | + Awaitable, |
26 | 27 | Callable, |
27 | 28 | Collection, |
28 | 29 | Dict, |
|
80 | 81 | make_deferred_yieldable, |
81 | 82 | run_in_background, |
82 | 83 | ) |
83 | | -from synapse.metrics.background_process_metrics import run_as_background_process |
| 84 | +from synapse.metrics.background_process_metrics import ( |
| 85 | + run_as_background_process as _run_as_background_process, |
| 86 | +) |
84 | 87 | from synapse.module_api.callbacks.account_validity_callbacks import ( |
85 | 88 | IS_USER_EXPIRED_CALLBACK, |
86 | 89 | ON_LEGACY_ADMIN_REQUEST, |
|
158 | 161 | from synapse.util.frozenutils import freeze |
159 | 162 |
|
160 | 163 | if TYPE_CHECKING: |
| 164 | + # Old versions don't have `LiteralString` |
| 165 | + from typing_extensions import LiteralString |
| 166 | + |
161 | 167 | from synapse.app.generic_worker import GenericWorkerStore |
162 | 168 | from synapse.server import HomeServer |
163 | 169 |
|
@@ -216,6 +222,60 @@ class UserIpAndAgent: |
216 | 222 | last_seen: int |
217 | 223 |
|
218 | 224 |
|
| 225 | +def run_as_background_process( |
| 226 | + desc: "LiteralString", |
| 227 | + func: Callable[..., Awaitable[Optional[T]]], |
| 228 | + *args: Any, |
| 229 | + bg_start_span: bool = True, |
| 230 | + **kwargs: Any, |
| 231 | +) -> "defer.Deferred[Optional[T]]": |
| 232 | + """ |
| 233 | + XXX: Deprecated: use `ModuleApi.run_as_background_process` instead. |
| 234 | +
|
| 235 | + Run the given function in its own logcontext, with resource metrics |
| 236 | +
|
| 237 | + This should be used to wrap processes which are fired off to run in the |
| 238 | + background, instead of being associated with a particular request. |
| 239 | +
|
| 240 | + It returns a Deferred which completes when the function completes, but it doesn't |
| 241 | + follow the synapse logcontext rules, which makes it appropriate for passing to |
| 242 | + clock.looping_call and friends (or for firing-and-forgetting in the middle of a |
| 243 | + normal synapse async function). |
| 244 | +
|
| 245 | + Args: |
| 246 | + desc: a description for this background process type |
| 247 | + server_name: The homeserver name that this background process is being run for |
| 248 | + (this should be `hs.hostname`). |
| 249 | + func: a function, which may return a Deferred or a coroutine |
| 250 | + bg_start_span: Whether to start an opentracing span. Defaults to True. |
| 251 | + Should only be disabled for processes that will not log to or tag |
| 252 | + a span. |
| 253 | + args: positional args for func |
| 254 | + kwargs: keyword args for func |
| 255 | +
|
| 256 | + Returns: |
| 257 | + Deferred which returns the result of func, or `None` if func raises. |
| 258 | + Note that the returned Deferred does not follow the synapse logcontext |
| 259 | + rules. |
| 260 | + """ |
| 261 | + |
| 262 | + logger.warning( |
| 263 | + "Using deprecated `run_as_background_process` that's exported from the Module API. " |
| 264 | + "Prefer `ModuleApi.run_as_background_process` instead.", |
| 265 | + ) |
| 266 | + |
| 267 | + stub_server_name = "synapse_module" |
| 268 | + |
| 269 | + return _run_as_background_process( |
| 270 | + desc, |
| 271 | + stub_server_name, |
| 272 | + func, |
| 273 | + *args, |
| 274 | + bg_start_span=bg_start_span, |
| 275 | + **kwargs, |
| 276 | + ) |
| 277 | + |
| 278 | + |
219 | 279 | def cached( |
220 | 280 | *, |
221 | 281 | max_entries: int = 1000, |
@@ -1323,10 +1383,9 @@ def looping_background_call( |
1323 | 1383 |
|
1324 | 1384 | if self._hs.config.worker.run_background_tasks or run_on_all_instances: |
1325 | 1385 | self._clock.looping_call( |
1326 | | - run_as_background_process, |
| 1386 | + self.run_as_background_process, |
1327 | 1387 | msec, |
1328 | 1388 | desc, |
1329 | | - self.server_name, |
1330 | 1389 | lambda: maybe_awaitable(f(*args, **kwargs)), |
1331 | 1390 | ) |
1332 | 1391 | else: |
@@ -1382,9 +1441,8 @@ def delayed_background_call( |
1382 | 1441 | return self._clock.call_later( |
1383 | 1442 | # convert ms to seconds as needed by call_later. |
1384 | 1443 | msec * 0.001, |
1385 | | - run_as_background_process, |
| 1444 | + self.run_as_background_process, |
1386 | 1445 | desc, |
1387 | | - self.server_name, |
1388 | 1446 | lambda: maybe_awaitable(f(*args, **kwargs)), |
1389 | 1447 | ) |
1390 | 1448 |
|
@@ -1590,6 +1648,44 @@ async def get_room_state( |
1590 | 1648 |
|
1591 | 1649 | return {key: state_events[event_id] for key, event_id in state_ids.items()} |
1592 | 1650 |
|
| 1651 | + def run_as_background_process( |
| 1652 | + self, |
| 1653 | + desc: "LiteralString", |
| 1654 | + func: Callable[..., Awaitable[Optional[T]]], |
| 1655 | + *args: Any, |
| 1656 | + bg_start_span: bool = True, |
| 1657 | + **kwargs: Any, |
| 1658 | + ) -> "defer.Deferred[Optional[T]]": |
| 1659 | + """Run the given function in its own logcontext, with resource metrics |
| 1660 | +
|
| 1661 | + This should be used to wrap processes which are fired off to run in the |
| 1662 | + background, instead of being associated with a particular request. |
| 1663 | +
|
| 1664 | + It returns a Deferred which completes when the function completes, but it doesn't |
| 1665 | + follow the synapse logcontext rules, which makes it appropriate for passing to |
| 1666 | + clock.looping_call and friends (or for firing-and-forgetting in the middle of a |
| 1667 | + normal synapse async function). |
| 1668 | +
|
| 1669 | + Args: |
| 1670 | + desc: a description for this background process type |
| 1671 | + server_name: The homeserver name that this background process is being run for |
| 1672 | + (this should be `hs.hostname`). |
| 1673 | + func: a function, which may return a Deferred or a coroutine |
| 1674 | + bg_start_span: Whether to start an opentracing span. Defaults to True. |
| 1675 | + Should only be disabled for processes that will not log to or tag |
| 1676 | + a span. |
| 1677 | + args: positional args for func |
| 1678 | + kwargs: keyword args for func |
| 1679 | +
|
| 1680 | + Returns: |
| 1681 | + Deferred which returns the result of func, or `None` if func raises. |
| 1682 | + Note that the returned Deferred does not follow the synapse logcontext |
| 1683 | + rules. |
| 1684 | + """ |
| 1685 | + return _run_as_background_process( |
| 1686 | + desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs |
| 1687 | + ) |
| 1688 | + |
1593 | 1689 | async def defer_to_thread( |
1594 | 1690 | self, |
1595 | 1691 | f: Callable[P, T], |
|
0 commit comments