1313# See the License for the specific language governing permissions and
1414# limitations under the License.
1515
16+ import inspect
1617import logging
1718import threading
18- from asyncio import iscoroutine
1919from functools import wraps
2020from typing import TYPE_CHECKING , Dict , Optional , Set
2121
2222from prometheus_client .core import REGISTRY , Counter , Gauge
2323
2424from twisted .internet import defer
25- from twisted .python .failure import Failure
2625
2726from synapse .logging .context import LoggingContext , PreserveLoggingContext
2827
@@ -167,7 +166,7 @@ def update_metrics(self):
167166 )
168167
169168
170- def run_as_background_process (desc , func , * args , ** kwargs ):
169+ def run_as_background_process (desc : str , func , * args , ** kwargs ):
171170 """Run the given function in its own logcontext, with resource metrics
172171
173172 This should be used to wrap processes which are fired off to run in the
@@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
179178 normal synapse inlineCallbacks function).
180179
181180 Args:
182- desc (str) : a description for this background process type
181+ desc: a description for this background process type
183182 func: a function, which may return a Deferred or a coroutine
184183 args: positional args for func
185184 kwargs: keyword args for func
@@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
188187 follow the synapse logcontext rules.
189188 """
190189
191- @defer .inlineCallbacks
192- def run ():
190+ async def run ():
193191 with _bg_metrics_lock :
194192 count = _background_process_counts .get (desc , 0 )
195193 _background_process_counts [desc ] = count + 1
@@ -203,29 +201,21 @@ def run():
203201 try :
204202 result = func (* args , ** kwargs )
205203
206- # We probably don't have an ensureDeferred in our call stack to handle
207- # coroutine results, so we need to ensureDeferred here.
208- #
209- # But we need this check because ensureDeferred doesn't like being
210- # called on immediate values (as opposed to Deferreds or coroutines).
211- if iscoroutine (result ):
212- result = defer .ensureDeferred (result )
204+ if inspect .isawaitable (result ):
205+ result = await result
213206
214- return ( yield result )
207+ return result
215208 except Exception :
216- # failure.Failure() fishes the original Failure out of our stack, and
217- # thus gives us a sensible stack trace.
218- f = Failure ()
219- logger .error (
220- "Background process '%s' threw an exception" ,
221- desc ,
222- exc_info = (f .type , f .value , f .getTracebackObject ()),
209+ logger .exception (
210+ "Background process '%s' threw an exception" , desc ,
223211 )
224212 finally :
225213 _background_process_in_flight_count .labels (desc ).dec ()
226214
227215 with PreserveLoggingContext ():
228- return run ()
216+ # Note that we return a Deferred here so that it can be used in a
217+ # looping_call and other places that expect a Deferred.
218+ return defer .ensureDeferred (run ())
229219
230220
231221def wrap_as_background_process (desc ):
0 commit comments