Skip to content

Commit

Permalink
Add cluster to ConfigureRouteAsync (#1231)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kahbazi authored Sep 7, 2021
1 parent 8b22f3a commit bda7ace
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ csharp_using_directive_placement = outside_namespace:error

# New-line options
# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2019#new-line-options
csharp_new_line_before_open_brace = methods, properties, control_blocks, types, anonymous_methods, lambdas, object_collection_array_initializers
csharp_new_line_before_open_brace = methods, properties, control_blocks, types, anonymous_methods, lambdas, object_collection_array_initializers, accessors
csharp_new_line_before_else = true
csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig origCluster,
return new ValueTask<ClusterConfig>(origCluster with { Destinations = newDests });
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
// Example: do not let config based routes take priority over code based routes.
// Lower numbers are higher priority. Code routes default to 0.
Expand Down
3 changes: 2 additions & 1 deletion src/ReverseProxy/Configuration/IProxyConfigFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface IProxyConfigFilter
/// Allows modification of a route configuration.
/// </summary>
/// <param name="route">The <see cref="RouteConfig"/> instance to configure.</param>
ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel);
/// <param name="cluster">The <see cref="ClusterConfig"/> instance related to <see cref="RouteConfig"/>.</param>
ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig? cluster, CancellationToken cancel);
}
}
41 changes: 25 additions & 16 deletions src/ReverseProxy/Management/ProxyConfigManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand All @@ -15,9 +16,9 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Yarp.ReverseProxy.Configuration;
using Yarp.ReverseProxy.Forwarder;
using Yarp.ReverseProxy.Health;
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Forwarder;
using Yarp.ReverseProxy.Routing;
using Yarp.ReverseProxy.Transforms.Builder;

Expand All @@ -34,6 +35,8 @@ namespace Yarp.ReverseProxy.Management
/// </remarks>
internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
{
private static readonly IReadOnlyDictionary<string, ClusterConfig> _emptyClusterDictionary = new ReadOnlyDictionary<string, ClusterConfig>(new Dictionary<string, ClusterConfig>());

private readonly object _syncRoot = new object();
private readonly ILogger<ProxyConfigManager> _logger;
private readonly IProxyConfigProvider _provider;
Expand Down Expand Up @@ -209,21 +212,21 @@ private async Task ReloadConfigAsync()
// Throws for validation failures
private async Task<bool> ApplyConfigAsync(IProxyConfig config)
{
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(config.Routes, cancellation: default);
var (configuredClusters, clusterErrors) = await VerifyClustersAsync(config.Clusters, cancellation: default);
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(config.Routes, configuredClusters, cancellation: default);

if (routeErrors.Count > 0 || clusterErrors.Count > 0)
{
throw new AggregateException("The proxy config is invalid.", routeErrors.Concat(clusterErrors));
}

// Update clusters first because routes need to reference them.
UpdateRuntimeClusters(configuredClusters);
UpdateRuntimeClusters(configuredClusters.Values);
var routesChanged = UpdateRuntimeRoutes(configuredRoutes);
return routesChanged;
}

private async Task<(IList<RouteConfig>, IList<Exception>)> VerifyRoutesAsync(IReadOnlyList<RouteConfig> routes, CancellationToken cancellation)
private async Task<(IList<RouteConfig>, IList<Exception>)> VerifyRoutesAsync(IReadOnlyList<RouteConfig> routes, IReadOnlyDictionary<string, ClusterConfig> clusters, CancellationToken cancellation)
{
if (routes == null)
{
Expand All @@ -246,9 +249,18 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)

try
{
foreach (var filter in _filters)
if (_filters.Length != 0)
{
route = await filter.ConfigureRouteAsync(route, cancellation);
ClusterConfig? cluster = null;
if (route.ClusterId != null)
{
clusters.TryGetValue(route.ClusterId, out cluster);
}

foreach (var filter in _filters)
{
route = await filter.ConfigureRouteAsync(route, cluster, cancellation);
}
}
}
catch (Exception ex)
Expand All @@ -275,29 +287,26 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)
return (configuredRoutes, errors);
}

private async Task<(IList<ClusterConfig>, IList<Exception>)> VerifyClustersAsync(IReadOnlyList<ClusterConfig> clusters, CancellationToken cancellation)
private async Task<(IReadOnlyDictionary<string, ClusterConfig>, IList<Exception>)> VerifyClustersAsync(IReadOnlyList<ClusterConfig> clusters, CancellationToken cancellation)
{
if (clusters == null)
{
return (Array.Empty<ClusterConfig>(), Array.Empty<Exception>());
return (_emptyClusterDictionary, Array.Empty<Exception>());
}

var seenClusterIds = new HashSet<string>(clusters.Count, StringComparer.OrdinalIgnoreCase);
var configuredClusters = new List<ClusterConfig>(clusters.Count);
var configuredClusters = new Dictionary<string, ClusterConfig>(clusters.Count, StringComparer.OrdinalIgnoreCase);
var errors = new List<Exception>();
// The IProxyConfigProvider provides a fresh snapshot that we need to reconfigure each time.
foreach (var c in clusters)
{
try
{
if (seenClusterIds.Contains(c.ClusterId))
if (configuredClusters.ContainsKey(c.ClusterId))
{
errors.Add(new ArgumentException($"Duplicate cluster '{c.ClusterId}'."));
continue;
}

seenClusterIds.Add(c.ClusterId);

// Don't modify the original
var cluster = c;

Expand All @@ -313,7 +322,7 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)
continue;
}

configuredClusters.Add(cluster);
configuredClusters.Add(cluster.ClusterId, cluster);
}
catch (Exception ex)
{
Expand All @@ -323,13 +332,13 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)

if (errors.Count > 0)
{
return (Array.Empty<ClusterConfig>(), errors);
return (_emptyClusterDictionary, errors);
}

return (configuredClusters, errors);
}

private void UpdateRuntimeClusters(IList<ClusterConfig> incomingClusters)
private void UpdateRuntimeClusters(IEnumerable<ClusterConfig> incomingClusters)
{
var desiredClusters = new HashSet<string>(StringComparer.OrdinalIgnoreCase);

Expand Down
53 changes: 40 additions & 13 deletions test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
return new ValueTask<ClusterConfig>(cluster);
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route with
{
Expand All @@ -387,45 +387,72 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
});
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route with { Order = 12 });
string order;
if (cluster != null)
{
order = cluster.Metadata["Order"];
}
else
{
order = "12";
}

return new ValueTask<RouteConfig>(route with { Order = int.Parse(order) });
}
}

[Fact]
public async Task LoadAsync_ConfigFilterConfiguresCluster_Works()
{
var route = new RouteConfig
var route1 = new RouteConfig
{
RouteId = "route1",
ClusterId = "cluster1",
Match = new RouteMatch { Path = "/" }
};
var route2 = new RouteConfig
{
RouteId = "route2",
ClusterId = "cluster2",
Match = new RouteMatch { Path = "/" }
};
var cluster = new ClusterConfig()
{
ClusterId = "cluster1",
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
{
{ "d1", new DestinationConfig() { Address = "http://localhost" } }
},
Metadata = new Dictionary<string, string>
{
["Order"] = "47"
}
};
var services = CreateServices(new List<RouteConfig>() { route }, new List<ClusterConfig>() { cluster }, proxyBuilder =>
var services = CreateServices(new List<RouteConfig>() { route1, route2 }, new List<ClusterConfig>() { cluster }, proxyBuilder =>
{
proxyBuilder.AddConfigFilter<ClusterAndRouteFilter>();
});
var manager = services.GetRequiredService<ProxyConfigManager>();
var dataSource = await manager.InitialLoadAsync();

Assert.NotNull(dataSource);
var endpoint = Assert.Single(dataSource.Endpoints);
var routeConfig = endpoint.Metadata.GetMetadata<RouteModel>();
var clusterState = routeConfig.Cluster;
Assert.NotNull(clusterState);
Assert.True(clusterState.Model.Config.HealthCheck.Active.Enabled);
Assert.Equal(TimeSpan.FromSeconds(12), clusterState.Model.Config.HealthCheck.Active.Interval);
var destination = Assert.Single(clusterState.DestinationsState.AllDestinations);
Assert.Equal(2, dataSource.Endpoints.Count);

var endpoint1 = Assert.Single(dataSource.Endpoints.Where(x => x.DisplayName == "route1"));
var routeConfig1 = endpoint1.Metadata.GetMetadata<RouteModel>();
Assert.Equal(47, routeConfig1.Config.Order);
var clusterState1 = routeConfig1.Cluster;
Assert.NotNull(clusterState1);
Assert.True(clusterState1.Model.Config.HealthCheck.Active.Enabled);
Assert.Equal(TimeSpan.FromSeconds(12), clusterState1.Model.Config.HealthCheck.Active.Interval);
var destination = Assert.Single(clusterState1.DestinationsState.AllDestinations);
Assert.Equal("http://localhost", destination.Model.Config.Address);

var endpoint2 = Assert.Single(dataSource.Endpoints.Where(x => x.DisplayName == "route2"));
var routeConfig2 = endpoint2.Metadata.GetMetadata<RouteModel>();
Assert.Equal(12, routeConfig2.Config.Order);
}

private class ClusterAndRouteThrows : IProxyConfigFilter
Expand All @@ -435,7 +462,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
throw new NotFiniteNumberException("Test exception");
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
throw new NotFiniteNumberException("Test exception");
}
Expand Down
2 changes: 1 addition & 1 deletion testassets/ReverseProxy.Config/CustomConfigFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
return new ValueTask<ClusterConfig>(cluster);
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
// Do not let config based routes take priority over code based routes.
// Lower numbers are higher priority. Code routes default to 0.
Expand Down

0 comments on commit bda7ace

Please sign in to comment.