Skip to content

Commit

Permalink
Followup after #2098 (#2127)
Browse files Browse the repository at this point in the history
* Followup after #2098

* Include CreateRequest under destination_health_check activity
  • Loading branch information
MihaZupan authored May 9, 2023
1 parent be46175 commit f3c5a14
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 39 deletions.
3 changes: 1 addition & 2 deletions src/ReverseProxy/Forwarder/ForwarderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ public async Task Invoke(HttpContext context)
activity?.AddTag("proxy.route_id", route.Config.RouteId);
activity?.AddTag("proxy.cluster_id", cluster.ClusterId);


if (destinations.Count == 0)
{
Log.NoAvailableDestinations(_logger, cluster.ClusterId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.NoAvailableDestinations, ex: null));
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddError("Proxy forwarding failed","No available destinations to route to");
activity?.AddError("Proxy forwarding failed", "No available destinations to forward to");
return;
}

Expand Down
45 changes: 21 additions & 24 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private async Task ProbeCluster(ClusterState cluster)
}

// Creates an Activity to trace the active health checks
var activity = Observability.YarpActivitySource.StartActivity("proxy.cluster_health_checks", ActivityKind.Consumer);
using var activity = Observability.YarpActivitySource.StartActivity("proxy.cluster_health_checks", ActivityKind.Consumer);
activity?.AddTag("proxy.cluster_id", cluster.ClusterId);

Log.StartingActiveHealthProbingOnCluster(_logger, cluster.ClusterId);
Expand Down Expand Up @@ -157,12 +157,15 @@ private async Task ProbeCluster(ClusterState cluster)
}

Log.StoppedActiveHealthProbingOnCluster(_logger, cluster.ClusterId);
activity?.Stop();
}
}

private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState cluster, DestinationState destination, TimeSpan timeout)
{
using var probeActivity = Observability.YarpActivitySource.StartActivity("proxy.destination_health_check", ActivityKind.Client);
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);

HttpRequestMessage request;
try
{
Expand All @@ -172,36 +175,30 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
{
Log.ActiveHealthProbeConstructionFailedOnCluster(_logger, destination.DestinationId, cluster.ClusterId, ex);

probeActivity?.SetStatus(ActivityStatusCode.Error);

return new DestinationProbingResult(destination, null, ex);
}

using (var probeActivity = Observability.YarpActivitySource.StartActivity("proxy.destination_health_check", ActivityKind.Client))
using var cts = new CancellationTokenSource(timeout);

try
{
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);
var cts = new CancellationTokenSource(timeout);
try
{
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
var response = await cluster.Model.HttpClient.SendAsync(request, cts.Token);
Log.DestinationProbingCompleted(_logger, destination.DestinationId, cluster.ClusterId, (int)response.StatusCode);
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
var response = await cluster.Model.HttpClient.SendAsync(request, cts.Token);
Log.DestinationProbingCompleted(_logger, destination.DestinationId, cluster.ClusterId, (int)response.StatusCode);

probeActivity?.SetStatus(ActivityStatusCode.Ok);
probeActivity?.SetStatus(ActivityStatusCode.Ok);

return new DestinationProbingResult(destination, response, null);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);
return new DestinationProbingResult(destination, response, null);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);

probeActivity?.SetStatus(ActivityStatusCode.Error);
probeActivity?.SetStatus(ActivityStatusCode.Error);

return new DestinationProbingResult(destination, null, ex);
}
finally
{
cts.Dispose();
}
return new DestinationProbingResult(destination, null, ex);
}
}
}
23 changes: 19 additions & 4 deletions src/ReverseProxy/Model/ProxyPipelineInitializerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ProxyPipelineInitializerMiddleware(RequestDelegate next,
_next = next ?? throw new ArgumentNullException(nameof(next));
}

public async Task Invoke(HttpContext context)
public Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint()
?? throw new InvalidOperationException($"Routing Endpoint wasn't set for the current request.");
Expand All @@ -39,7 +39,7 @@ public async Task Invoke(HttpContext context)
{
Log.NoClusterFound(_logger, route.Config.RouteId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
return;
return Task.CompletedTask;
}

var destinationsState = cluster.DestinationsState;
Expand All @@ -51,11 +51,26 @@ public async Task Invoke(HttpContext context)
AvailableDestinations = destinationsState.AvailableDestinations,
});

using (var activity = Observability.YarpActivitySource.StartActivity("proxy.forwarder", ActivityKind.Server))
var activity = Observability.YarpActivitySource.CreateActivity("proxy.forwarder", ActivityKind.Server);

return activity is null
? _next(context)
: AwaitWithActivity(context, activity);
}

private async Task AwaitWithActivity(HttpContext context, Activity activity)
{
context.SetYarpActivity(activity);

activity.Start();
try
{
context.SetYarpActivity(activity);
await _next(context);
}
finally
{
activity.Dispose();
}
}

private static class Log
Expand Down
11 changes: 3 additions & 8 deletions src/ReverseProxy/Utilities/Observability.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.Http;

namespace Yarp.ReverseProxy.Utilities;
Expand All @@ -23,15 +17,16 @@ internal static class Observability

public static void SetYarpActivity(this HttpContext context, Activity? activity)
{
if (activity != null)
if (activity is not null)
{
context.Features[typeof(YarpActivity)] = activity;
}
}

public static void AddError(this Activity activity, string message, string description)
{
if (activity != null) {
if (activity is not null)
{
var tagsCollection = new ActivityTagsCollection
{
{ "error", message },
Expand Down
2 changes: 1 addition & 1 deletion src/TelemetryConsumption/Http/HttpEventListenerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected override void OnEvent(IHttpTelemetryConsumer[] consumers, EventWritten
break;

case 3:
Debug.Assert(eventData.EventName == "RequestFailed" && payload.Count == 0);
Debug.Assert(eventData.EventName == "RequestFailed" /* && payload.Count == 0 */);
{
foreach (var consumer in consumers)
{
Expand Down

0 comments on commit f3c5a14

Please sign in to comment.