Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand Down Expand Up @@ -286,8 +286,12 @@ public Task<Metadata> GetResponseHeadersAsync()
{
if (_responseHeadersTask == null)
{
// Allocate metadata and task only when requested
// Allocate metadata and task only when requested.
_responseHeadersTask = GetResponseHeadersCoreAsync();

// ResponseHeadersAsync could be called inside a client interceptor when a call is wrapped.
// Most people won't use the headers result. Observed exception to avoid unobserved exception event.
_responseHeadersTask.ObserveException();
}

return _responseHeadersTask;
Expand Down
175 changes: 167 additions & 8 deletions test/Grpc.Net.Client.Tests/AsyncUnaryCallTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand All @@ -20,6 +20,7 @@
using System.Net.Http.Headers;
using Greet;
using Grpc.Core;
using Grpc.Core.Interceptors;
using Grpc.Net.Client.Internal;
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Shared;
Expand Down Expand Up @@ -239,11 +240,12 @@ public enum ResponseHandleAction
}

[Test]
[TestCase(0, ResponseHandleAction.ResponseAsync)]
[TestCase(0, ResponseHandleAction.ResponseHeadersAsync)]
[TestCase(0, ResponseHandleAction.Dispose)]
[TestCase(1, ResponseHandleAction.Nothing)]
public async Task AsyncUnaryCall_CallFailed_NoUnobservedExceptions(int expectedUnobservedExceptions, ResponseHandleAction action)
[TestCase(0, false, ResponseHandleAction.ResponseAsync)]
[TestCase(0, true, ResponseHandleAction.ResponseAsync)]
[TestCase(0, false, ResponseHandleAction.ResponseHeadersAsync)]
[TestCase(0, false, ResponseHandleAction.Dispose)]
[TestCase(1, false, ResponseHandleAction.Nothing)]
public async Task AsyncUnaryCall_CallFailed_NoUnobservedExceptions(int expectedUnobservedExceptions, bool addClientInterceptor, ResponseHandleAction action)
{
// Arrange
var services = new ServiceCollection();
Expand All @@ -267,7 +269,11 @@ public async Task AsyncUnaryCall_CallFailed_NoUnobservedExceptions(int expectedU
{
throw new Exception("Test error");
});
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);
CallInvoker invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);
if (addClientInterceptor)
{
invoker = invoker.Intercept(new ClientLoggerInterceptor(loggerFactory));
}

// Act
logger.LogDebug("Starting call");
Expand All @@ -282,7 +288,7 @@ public async Task AsyncUnaryCall_CallFailed_NoUnobservedExceptions(int expectedU
// Assert
Assert.AreEqual(expectedUnobservedExceptions, unobservedExceptions.Count);

static async Task MakeGrpcCallAsync(ILogger logger, HttpClientCallInvoker invoker, ResponseHandleAction action)
static async Task MakeGrpcCallAsync(ILogger logger, CallInvoker invoker, ResponseHandleAction action)
{
var runTask = Task.Run(async () =>
{
Expand Down Expand Up @@ -313,4 +319,157 @@ static async Task MakeGrpcCallAsync(ILogger logger, HttpClientCallInvoker invoke
TaskScheduler.UnobservedTaskException -= onUnobservedTaskException;
}
}

private class ClientLoggerInterceptor : Interceptor
{
private readonly ILogger<ClientLoggerInterceptor> _logger;

public ClientLoggerInterceptor(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<ClientLoggerInterceptor>();
}

public override TResponse BlockingUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
{
LogCall(context.Method);
AddCallerMetadata(ref context);

try
{
return continuation(request, context);
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
LogCall(context.Method);
AddCallerMetadata(ref context);

try
{
var call = continuation(request, context);

return new AsyncUnaryCall<TResponse>(HandleResponse(call.ResponseAsync), call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> t)
{
try
{
var response = await t;
_logger.LogInformation($"Response received: {response}");
return response;
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
LogCall(context.Method);
AddCallerMetadata(ref context);

try
{
return continuation(context);
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
LogCall(context.Method);
AddCallerMetadata(ref context);

try
{
return continuation(request, context);
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
{
LogCall(context.Method);
AddCallerMetadata(ref context);

try
{
return continuation(context);
}
catch (Exception ex)
{
LogError(ex);
throw;
}
}

private void LogCall<TRequest, TResponse>(Method<TRequest, TResponse> method)
where TRequest : class
where TResponse : class
{
_logger.LogInformation($"Starting call. Name: {method.Name}. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
}

private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
where TRequest : class
where TResponse : class
{
var headers = context.Options.Headers;

// Call doesn't have a headers collection to add to.
// Need to create a new context with headers for the call.
if (headers == null)
{
headers = new Metadata();
var options = context.Options.WithHeaders(headers);
context = new ClientInterceptorContext<TRequest, TResponse>(context.Method, context.Host, options);
}

// Add caller metadata to call headers
headers.Add("caller-user", Environment.UserName);
headers.Add("caller-machine", Environment.MachineName);
headers.Add("caller-os", Environment.OSVersion.ToString());
}

private void LogError(Exception ex)
{
_logger.LogError(ex, $"Call error: {ex.Message}");
}
}
}