Skip to content

Commit

Permalink
feat: add support for agent cache (G-Research#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
winstxnhdw authored Jul 28, 2023
1 parent 39e0ec3 commit fc9c4f8
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 47 deletions.
24 changes: 24 additions & 0 deletions Consul.Test/AgentTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,30 @@ public async Task Agent_Checks_ServiceBound()
await _client.Agent.ServiceDeregister(svcID);
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task Agent_UseCache(bool useCache)
{
var opts = new QueryOptions
{
UseCache = useCache,
MaxAge = TimeSpan.FromSeconds(10),
StaleIfError = TimeSpan.FromSeconds(10),
};

var response = await _client.Catalog.Datacenters(opts);

if (useCache)
{
Assert.NotNull(response.XCache);
}
else
{
Assert.Null(response.XCache);
}
}

[Fact]
public async Task Agent_Join()
{
Expand Down
45 changes: 37 additions & 8 deletions Consul.Test/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ public async Task Client_SetQueryOptions()
Assert.Equal("1m40s", request.Params["wait"]);
}

[Fact]
public async Task Client_SetQueryOptionsWithCache()
{
var opts = new QueryOptions()
{
Datacenter = "foo",
Consistency = ConsistencyMode.Default,
WaitIndex = 1000,
WaitTime = new TimeSpan(0, 0, 100),
Token = "12345",
UseCache = true,
MaxAge = new TimeSpan(0, 0, 10),
StaleIfError = new TimeSpan(0, 0, 10)
};
var request = _client.Get<KVPair>("/v1/kv/foo", opts);

await Assert.ThrowsAsync<ConsulRequestException>(async () => await request.Execute(CancellationToken.None));

Assert.Equal("foo", request.Params["dc"]);
Assert.Equal("1000", request.Params["index"]);
Assert.Equal("1m40s", request.Params["wait"]);
Assert.Equal(string.Empty, request.Params["cached"]);
Assert.Equal("max-age=10,stale-if-error=10", request.Params["Cache-Control"]);
}

[Fact]
public async Task Client_SetClientOptions()
{
Expand Down Expand Up @@ -138,9 +163,11 @@ public async Task Client_CustomHttpClient()
hc.Timeout = TimeSpan.FromDays(10);
hc.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

var config = new ConsulClientConfiguration();
config.Address = TestHelper.HttpUri;
config.Token = TestHelper.MasterToken;
var config = new ConsulClientConfiguration
{
Address = TestHelper.HttpUri,
Token = TestHelper.MasterToken
};

#pragma warning disable CS0618 // Type or member is obsolete
using (var client = new ConsulClient(config, hc))
Expand Down Expand Up @@ -175,9 +202,11 @@ public async Task Client_DisposeBehavior()
[Fact]
public async Task Client_ReuseAndUpdateConfig()
{
var config = new ConsulClientConfiguration();
config.Address = TestHelper.HttpUri;
config.Token = TestHelper.MasterToken;
var config = new ConsulClientConfiguration
{
Address = TestHelper.HttpUri,
Token = TestHelper.MasterToken
};

#pragma warning disable CS0618 // Type or member is obsolete
using (var client = new ConsulClient(config))
Expand Down Expand Up @@ -219,8 +248,8 @@ public async Task Client_ReuseAndUpdateConfig()
[Fact]
public void Client_Constructors()
{
Action<ConsulClientConfiguration> cfgAction2 = (cfg) => { cfg.Token = "yep"; };
Action<ConsulClientConfiguration> cfgAction = (cfg) => { cfg.Datacenter = "foo"; cfgAction2(cfg); };
void cfgAction2(ConsulClientConfiguration cfg) { cfg.Token = "yep"; }
void cfgAction(ConsulClientConfiguration cfg) { cfg.Datacenter = "foo"; cfgAction2(cfg); }

using (var c = new ConsulClient())
{
Expand Down
40 changes: 23 additions & 17 deletions Consul/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ internal Catalog(ConsulClient c)
/// </summary>
/// <param name="reg">A catalog registration</param>
/// <returns>An empty write result</returns>
public Task<WriteResult> Register(CatalogRegistration reg, CancellationToken ct = default(CancellationToken))
public Task<WriteResult> Register(CatalogRegistration reg, CancellationToken ct = default)
{
return Register(reg, WriteOptions.Default, ct);
}
Expand All @@ -112,7 +112,7 @@ internal Catalog(ConsulClient c)
/// <param name="reg">A catalog registration</param>
/// <param name="q">Customized write options</param>
/// <returns>An empty write result</returns>
public Task<WriteResult> Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken))
public Task<WriteResult> Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default)
{
return _client.Put("/v1/catalog/register", reg, q).Execute(ct);
}
Expand All @@ -122,7 +122,7 @@ internal Catalog(ConsulClient c)
/// </summary>
/// <param name="reg">A catalog deregistration</param>
/// <returns>An empty write result</returns>
public Task<WriteResult> Deregister(CatalogDeregistration reg, CancellationToken ct = default(CancellationToken))
public Task<WriteResult> Deregister(CatalogDeregistration reg, CancellationToken ct = default)
{
return Deregister(reg, WriteOptions.Default, ct);
}
Expand All @@ -133,7 +133,7 @@ internal Catalog(ConsulClient c)
/// <param name="reg">A catalog deregistration</param>
/// <param name="q">Customized write options</param>
/// <returns>An empty write result</returns>
public Task<WriteResult> Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken))
public Task<WriteResult> Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default)
{
return _client.Put("/v1/catalog/deregister", reg, q).Execute(ct);
}
Expand All @@ -142,16 +142,25 @@ internal Catalog(ConsulClient c)
/// Datacenters is used to query for all the known datacenters
/// </summary>
/// <returns>A list of datacenter names</returns>
public Task<QueryResult<string[]>> Datacenters(CancellationToken ct = default(CancellationToken))
public Task<QueryResult<string[]>> Datacenters(CancellationToken ct = default)
{
return _client.Get<string[]>("/v1/catalog/datacenters").Execute(ct);
}

/// <summary>
/// Datacenters is used to query for all the known datacenters with custom query options
/// </summary>
/// <returns>A list of datacenter names</returns>
public Task<QueryResult<string[]>> Datacenters(QueryOptions q, CancellationToken ct = default)
{
return _client.Get<string[]>("/v1/catalog/datacenters", q).Execute(ct);
}

/// <summary>
/// Nodes is used to query all the known nodes
/// </summary>
/// <returns>A list of all nodes</returns>
public Task<QueryResult<Node[]>> Nodes(CancellationToken ct = default(CancellationToken))
public Task<QueryResult<Node[]>> Nodes(CancellationToken ct = default)
{
return Nodes(QueryOptions.Default, ct);
}
Expand All @@ -162,7 +171,7 @@ internal Catalog(ConsulClient c)
/// <param name="q">Customized query options</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>A list of all nodes</returns>
public Task<QueryResult<Node[]>> Nodes(QueryOptions q, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<Node[]>> Nodes(QueryOptions q, CancellationToken ct = default)
{
return _client.Get<Node[]>("/v1/catalog/nodes", q).Execute(ct);
}
Expand All @@ -171,7 +180,7 @@ internal Catalog(ConsulClient c)
/// Services is used to query for all known services
/// </summary>
/// <returns>A list of all services</returns>
public Task<QueryResult<Dictionary<string, string[]>>> Services(CancellationToken ct = default(CancellationToken))
public Task<QueryResult<Dictionary<string, string[]>>> Services(CancellationToken ct = default)
{
return Services(QueryOptions.Default, ct);
}
Expand All @@ -182,7 +191,7 @@ internal Catalog(ConsulClient c)
/// <param name="q">Customized query options</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>A list of all services</returns>
public Task<QueryResult<Dictionary<string, string[]>>> Services(QueryOptions q, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<Dictionary<string, string[]>>> Services(QueryOptions q, CancellationToken ct = default)
{
return _client.Get<Dictionary<string, string[]>>("/v1/catalog/services", q).Execute(ct);
}
Expand All @@ -193,7 +202,7 @@ internal Catalog(ConsulClient c)
/// <param name="service">The service ID</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>A list of service instances</returns>
public Task<QueryResult<CatalogService[]>> Service(string service, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<CatalogService[]>> Service(string service, CancellationToken ct = default)
{
return Service(service, string.Empty, QueryOptions.Default, ct);
}
Expand All @@ -205,7 +214,7 @@ internal Catalog(ConsulClient c)
/// <param name="tag">A tag to filter on</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>A list of service instances</returns>
public Task<QueryResult<CatalogService[]>> Service(string service, string tag, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<CatalogService[]>> Service(string service, string tag, CancellationToken ct = default)
{
return Service(service, tag, QueryOptions.Default, ct);
}
Expand Down Expand Up @@ -234,7 +243,7 @@ public Task<QueryResult<CatalogService[]>> Service(string service, string tag, Q
/// <param name="node">The node name</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>The node information including a list of services</returns>
public Task<QueryResult<CatalogNode>> Node(string node, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<CatalogNode>> Node(string node, CancellationToken ct = default)
{
return Node(node, QueryOptions.Default, ct);
}
Expand All @@ -246,7 +255,7 @@ public Task<QueryResult<CatalogService[]>> Service(string service, string tag, Q
/// <param name="q">Customized query options</param>
/// <param name="ct">Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing</param>
/// <returns>The node information including a list of services</returns>
public Task<QueryResult<CatalogNode>> Node(string node, QueryOptions q, CancellationToken ct = default(CancellationToken))
public Task<QueryResult<CatalogNode>> Node(string node, QueryOptions q, CancellationToken ct = default)
{
return _client.Get<CatalogNode>(string.Format("/v1/catalog/node/{0}", node), q).Execute(ct);
}
Expand All @@ -259,9 +268,6 @@ public partial class ConsulClient : IConsulClient
/// <summary>
/// Catalog returns a handle to the catalog endpoints
/// </summary>
public ICatalogEndpoint Catalog
{
get { return _catalog.Value; }
}
public ICatalogEndpoint Catalog => _catalog.Value;
}
}
77 changes: 72 additions & 5 deletions Consul/Client_GetRequests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -53,7 +54,7 @@ public GetRequest(ConsulClient client, string url, QueryOptions options, IEncoda
{
if (string.IsNullOrEmpty(url))
{
throw new ArgumentException(nameof(url));
throw new ArgumentException(null, nameof(url));
}
Options = options ?? QueryOptions.Default;
Filter = filter;
Expand Down Expand Up @@ -116,16 +117,15 @@ public async Task<QueryResult<Stream>> ExecuteStreaming(CancellationToken ct)
ApplyHeaders(message, Client.Config);
var response = await Client.HttpClient.SendAsync(message, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false);

ParseQueryHeaders(response, (result as QueryResult<TOut>));
ParseQueryHeaders(response, result as QueryResult<TOut>);
result.StatusCode = response.StatusCode;
ResponseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

result.Response = ResponseStream;

if (response.StatusCode != HttpStatusCode.NotFound && !response.IsSuccessStatusCode)
{
throw new ConsulRequestException(string.Format("Unexpected response, status code {0}",
response.StatusCode), response.StatusCode);
throw new ConsulRequestException($"Unexpected response, status code {response.StatusCode}", response.StatusCode);
}

result.RequestTime = timer.Elapsed;
Expand Down Expand Up @@ -171,6 +171,27 @@ protected override void ApplyOptions(ConsulClientConfiguration clientConfig)
{
Params["near"] = Options.Near;
}

if (Options.UseCache)
{
Params["cached"] = string.Empty;
var cacheControl = new List<string>();

if (Options.MaxAge.HasValue)
{
cacheControl.Add($"max-age={Convert.ToUInt64(Options.MaxAge.Value.TotalSeconds)}");
}

if (Options.StaleIfError.HasValue)
{
cacheControl.Add($"stale-if-error={Convert.ToUInt64(Options.StaleIfError.Value.TotalSeconds)}");
}

if (cacheControl.Count > 0)
{
Params["Cache-Control"] = string.Join(",", cacheControl);
}
}
}

protected void ParseQueryHeaders(HttpResponseMessage res, QueryResult<TOut> meta)
Expand Down Expand Up @@ -224,6 +245,31 @@ protected void ParseQueryHeaders(HttpResponseMessage res, QueryResult<TOut> meta
throw new ConsulRequestException("Failed to parse X-Consul-Translate-Addresses", res.StatusCode, ex);
}
}

if (headers.Contains("X-Cache"))
{
try
{

meta.XCache = headers.GetValues("X-Cache").Single() == "HIT" ? QueryResult.CacheResult.Hit : QueryResult.CacheResult.Miss;
}
catch (Exception ex)
{
throw new ConsulRequestException("Failed to parse X-Cache", res.StatusCode, ex);
}
}

if (headers.Contains("Age"))
{
try
{
meta.Age = TimeSpan.FromSeconds(double.Parse(headers.GetValues("Age").Single()));
}
catch (Exception ex)
{
throw new ConsulRequestException("Failed to parse Age", res.StatusCode, ex);
}
}
}

protected override void ApplyHeaders(HttpRequestMessage message, ConsulClientConfiguration clientConfig)
Expand All @@ -246,7 +292,7 @@ public GetRequest(ConsulClient client, string url, QueryOptions options = null)
{
if (string.IsNullOrEmpty(url))
{
throw new ArgumentException(nameof(url));
throw new ArgumentException(null, nameof(url));
}
Options = options ?? QueryOptions.Default;
}
Expand Down Expand Up @@ -306,6 +352,27 @@ protected override void ApplyOptions(ConsulClientConfiguration clientConfig)
{
Params["dc"] = Options.Datacenter;
}

if (Options.UseCache)
{
Params["cached"] = string.Empty;
var cacheControl = new List<string>();

if (Options.MaxAge.HasValue)
{
cacheControl.Add($"max-age={Convert.ToUInt64(Options.MaxAge.Value.TotalSeconds)}");
}

if (Options.StaleIfError.HasValue)
{
cacheControl.Add($"stale-if-error={Convert.ToUInt64(Options.StaleIfError.Value.TotalSeconds)}");
}

if (cacheControl.Count > 0)
{
Params["Cache-Control"] = string.Join(",", cacheControl);
}
}
}

protected override void ApplyHeaders(HttpRequestMessage message, ConsulClientConfiguration clientConfig)
Expand Down
Loading

0 comments on commit fc9c4f8

Please sign in to comment.