33
33
import grpc
34
34
35
35
import elasticapm
36
- from elasticapm .contrib .grpc .server_interceptor import _ServicerContextWrapper , _wrap_rpc_behavior , get_trace_parent
36
+ from elasticapm .contrib .grpc .server_interceptor import _ServicerContextWrapper , get_trace_parent
37
37
38
38
39
39
class _AsyncServerInterceptor (grpc .aio .ServerInterceptor ):
40
40
async def intercept_service (self , continuation , handler_call_details ):
41
- def transaction_wrapper (behavior , request_streaming , response_streaming ):
42
- async def _interceptor (request_or_iterator , context ):
43
- if request_streaming or response_streaming : # only unary-unary is supported
44
- return behavior (request_or_iterator , context )
41
+ def wrap_unary_unary (behavior ):
42
+ async def _interceptor (request , context ):
45
43
tp = get_trace_parent (handler_call_details )
46
44
client = elasticapm .get_client ()
47
45
transaction = client .begin_transaction ("request" , trace_parent = tp )
48
46
try :
49
- result = behavior (request_or_iterator , _ServicerContextWrapper (context , transaction ))
47
+ result = behavior (request , _ServicerContextWrapper (context , transaction ))
50
48
51
49
# This is so we can support both sync and async rpc functions
52
50
if inspect .isawaitable (result ):
@@ -65,4 +63,12 @@ async def _interceptor(request_or_iterator, context):
65
63
66
64
return _interceptor
67
65
68
- return _wrap_rpc_behavior (await continuation (handler_call_details ), transaction_wrapper )
66
+ handler = await continuation (handler_call_details )
67
+ if handler .request_streaming or handler .response_streaming :
68
+ return handler
69
+
70
+ return grpc .unary_unary_rpc_method_handler (
71
+ wrap_unary_unary (handler .unary_unary ),
72
+ request_deserializer = handler .request_deserializer ,
73
+ response_serializer = handler .response_serializer ,
74
+ )
0 commit comments