Skip to content
This repository has been archived by the owner on Sep 8, 2023. It is now read-only.

Commit

Permalink
fix stream failure (#15)
Browse files Browse the repository at this point in the history
* remove dead code

`_metricServer` isn't used, so there is no need for the ClientInterceptor
to be IDisposable. Updated the ServerInterceptor for symmetry.

* Also send failure to prometheus when a stream fails halfway through

When the stream is successfully started, but throws an exception
halfway through. Then the metric should be updated with a failure

Co-authored-by: Tom Moers <tom.moers@tomra.com>
  • Loading branch information
tmoers and tmoers authored Jun 22, 2021
1 parent 9e9033a commit f6071cd
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 33 deletions.
28 changes: 15 additions & 13 deletions NetGrpcPrometheus/ClientInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ namespace NetGrpcPrometheus
/// <summary>
/// Interceptor for intercepting calls on client side
/// </summary>
public class ClientInterceptor : Interceptor, IDisposable
public class ClientInterceptor : Interceptor
{
private readonly MetricServer _metricServer;

private readonly MetricsBase _metrics;

Expand Down Expand Up @@ -178,10 +177,16 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
{
AsyncServerStreamingCall<TResponse> streamingCall = continuation(request, context);

result = new AsyncServerStreamingCall<TResponse>(
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); }), streamingCall.ResponseHeadersAsync,
streamingCall.GetStatus, streamingCall.GetTrailers, streamingCall.Dispose);
result =
new AsyncServerStreamingCall<TResponse>(
new WrapperStreamReader<TResponse>(
streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); },
statusCode => { _metrics.ResponseCounterInc(method, statusCode); }),
streamingCall.ResponseHeadersAsync,
streamingCall.GetStatus,
streamingCall.GetTrailers,
streamingCall.Dispose);

_metrics.ResponseCounterInc(method, StatusCode.OK);
}
Expand Down Expand Up @@ -216,8 +221,10 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
AsyncDuplexStreamingCall<TRequest, TResponse> streamingCall = continuation(context);

WrapperStreamReader<TResponse> responseStream =
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); });
new WrapperStreamReader<TResponse>(
streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); },
statusCode => { _metrics.ResponseCounterInc(method, statusCode); });

result = new AsyncDuplexStreamingCall<TRequest, TResponse>(
new WrapperClientStreamWriter<TRequest>(streamingCall.RequestStream,
Expand All @@ -240,10 +247,5 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami

return result;
}

public void Dispose()
{
((IDisposable) _metricServer)?.Dispose();
}
}
}
23 changes: 14 additions & 9 deletions NetGrpcPrometheus/Helpers/WrapperStreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,38 @@ public class WrapperStreamReader<T> : IAsyncStreamReader<T>

private readonly IAsyncStreamReader<T> _reader;
private readonly Action _onMessage;
private readonly Action<StatusCode> _onError;

/// <summary>
/// Constructor for <see cref="IAsyncStreamReader{T}"/> wrapper
/// </summary>
/// <param name="reader">Stream reader that should be wrapped by this class</param>
/// <param name="onMessage">Action that should be executed on each message received from the stream</param>
public WrapperStreamReader(IAsyncStreamReader<T> reader, Action onMessage)
public WrapperStreamReader(IAsyncStreamReader<T> reader, Action onMessage, Action<StatusCode> onError)
{
_reader = reader;
_onMessage = onMessage;
_onError = onError;
}

public void Dispose()
{

}

public Task<bool> MoveNext(CancellationToken cancellationToken)
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
Task<bool> result = _reader.MoveNext(cancellationToken);

result.ContinueWith(task =>
try
{
var result = await _reader.MoveNext(cancellationToken);
_onMessage.Invoke();
}, cancellationToken);

return result;
return result;
}
catch (RpcException e)
{
_onError(e.StatusCode);
throw;
}
}
}
}
19 changes: 10 additions & 9 deletions NetGrpcPrometheus/ServerInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NetGrpcPrometheus
/// <summary>
/// Interceptor for intercepting calls on server side
/// </summary>
public class ServerInterceptor : Interceptor, IDisposable
public class ServerInterceptor : Interceptor
{
private readonly MetricsBase _metrics;

Expand Down Expand Up @@ -115,8 +115,11 @@ public override Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse
try
{
result = continuation(
new WrapperStreamReader<TRequest>(requestStream,
() => { _metrics.StreamReceivedCounterInc(method); }), context);
new WrapperStreamReader<TRequest>(
requestStream,
() => { _metrics.StreamReceivedCounterInc(method); },
statusCode => { _metrics.ResponseCounterInc(method, statusCode); }),
context);

_metrics.ResponseCounterInc(method, StatusCode.OK);
}
Expand Down Expand Up @@ -151,8 +154,10 @@ public override Task DuplexStreamingServerHandler<TRequest, TResponse>(
try
{
result = continuation(
new WrapperStreamReader<TRequest>(requestStream,
() => { _metrics.StreamReceivedCounterInc(method); }),
new WrapperStreamReader<TRequest>(
requestStream,
() => { _metrics.StreamReceivedCounterInc(method); },
statusCode => { _metrics.ResponseCounterInc(method, statusCode); }),
new WrapperServerStreamWriter<TResponse>(responseStream,
() => { _metrics.StreamSentCounterInc(method); }), context);

Expand All @@ -171,9 +176,5 @@ public override Task DuplexStreamingServerHandler<TRequest, TResponse>(

return result;
}

public void Dispose()
{
}
}
}
1 change: 0 additions & 1 deletion NetGrpcPrometheusTest/Helpers/TestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public void Wait()
public void Dispose()
{
_metricsServer.StopAsync().Wait();
_interceptor.Dispose();
}
}
}
1 change: 0 additions & 1 deletion NetGrpcPrometheusTest/Helpers/TestServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void Shutdown()
public void Dispose()
{
Shutdown();
_interceptor?.Dispose();
}
}
}

0 comments on commit f6071cd

Please sign in to comment.