From fc9c4f836cb94da4a64bea171619918eab16cf71 Mon Sep 17 00:00:00 2001 From: Winston H <56998716+winstxnhdw@users.noreply.github.com> Date: Fri, 28 Jul 2023 17:42:38 +0800 Subject: [PATCH] feat: add support for agent cache (#225) --- Consul.Test/AgentTest.cs | 24 +++++++++ Consul.Test/ClientTest.cs | 45 +++++++++++++--- Consul/Catalog.cs | 40 ++++++++------ Consul/Client_GetRequests.cs | 77 +++++++++++++++++++++++++-- Consul/Client_Options.cs | 28 +++++++++- Consul/Client_Results.cs | 17 ++++++ Consul/Interfaces/ICatalogEndpoint.cs | 29 +++++----- Consul/Lock.cs | 2 +- 8 files changed, 215 insertions(+), 47 deletions(-) diff --git a/Consul.Test/AgentTest.cs b/Consul.Test/AgentTest.cs index 6f561c40d..bdecf2a34 100644 --- a/Consul.Test/AgentTest.cs +++ b/Consul.Test/AgentTest.cs @@ -408,6 +408,30 @@ public async Task Agent_Checks_ServiceBound() await _client.Agent.ServiceDeregister(svcID); } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task Agent_UseCache(bool useCache) + { + var opts = new QueryOptions + { + UseCache = useCache, + MaxAge = TimeSpan.FromSeconds(10), + StaleIfError = TimeSpan.FromSeconds(10), + }; + + var response = await _client.Catalog.Datacenters(opts); + + if (useCache) + { + Assert.NotNull(response.XCache); + } + else + { + Assert.Null(response.XCache); + } + } + [Fact] public async Task Agent_Join() { diff --git a/Consul.Test/ClientTest.cs b/Consul.Test/ClientTest.cs index 6458fb2a5..22eff65d7 100644 --- a/Consul.Test/ClientTest.cs +++ b/Consul.Test/ClientTest.cs @@ -94,6 +94,31 @@ public async Task Client_SetQueryOptions() Assert.Equal("1m40s", request.Params["wait"]); } + [Fact] + public async Task Client_SetQueryOptionsWithCache() + { + var opts = new QueryOptions() + { + Datacenter = "foo", + Consistency = ConsistencyMode.Default, + WaitIndex = 1000, + WaitTime = new TimeSpan(0, 0, 100), + Token = "12345", + UseCache = true, + MaxAge = new TimeSpan(0, 0, 10), + StaleIfError = new TimeSpan(0, 0, 10) + }; + var request = _client.Get("/v1/kv/foo", opts); + + await Assert.ThrowsAsync(async () => await request.Execute(CancellationToken.None)); + + Assert.Equal("foo", request.Params["dc"]); + Assert.Equal("1000", request.Params["index"]); + Assert.Equal("1m40s", request.Params["wait"]); + Assert.Equal(string.Empty, request.Params["cached"]); + Assert.Equal("max-age=10,stale-if-error=10", request.Params["Cache-Control"]); + } + [Fact] public async Task Client_SetClientOptions() { @@ -138,9 +163,11 @@ public async Task Client_CustomHttpClient() hc.Timeout = TimeSpan.FromDays(10); hc.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); - var config = new ConsulClientConfiguration(); - config.Address = TestHelper.HttpUri; - config.Token = TestHelper.MasterToken; + var config = new ConsulClientConfiguration + { + Address = TestHelper.HttpUri, + Token = TestHelper.MasterToken + }; #pragma warning disable CS0618 // Type or member is obsolete using (var client = new ConsulClient(config, hc)) @@ -175,9 +202,11 @@ public async Task Client_DisposeBehavior() [Fact] public async Task Client_ReuseAndUpdateConfig() { - var config = new ConsulClientConfiguration(); - config.Address = TestHelper.HttpUri; - config.Token = TestHelper.MasterToken; + var config = new ConsulClientConfiguration + { + Address = TestHelper.HttpUri, + Token = TestHelper.MasterToken + }; #pragma warning disable CS0618 // Type or member is obsolete using (var client = new ConsulClient(config)) @@ -219,8 +248,8 @@ public async Task Client_ReuseAndUpdateConfig() [Fact] public void Client_Constructors() { - Action cfgAction2 = (cfg) => { cfg.Token = "yep"; }; - Action cfgAction = (cfg) => { cfg.Datacenter = "foo"; cfgAction2(cfg); }; + void cfgAction2(ConsulClientConfiguration cfg) { cfg.Token = "yep"; } + void cfgAction(ConsulClientConfiguration cfg) { cfg.Datacenter = "foo"; cfgAction2(cfg); } using (var c = new ConsulClient()) { diff --git a/Consul/Catalog.cs b/Consul/Catalog.cs index f47427bd6..7d1fb59bb 100644 --- a/Consul/Catalog.cs +++ b/Consul/Catalog.cs @@ -101,7 +101,7 @@ internal Catalog(ConsulClient c) /// /// A catalog registration /// An empty write result - public Task Register(CatalogRegistration reg, CancellationToken ct = default(CancellationToken)) + public Task Register(CatalogRegistration reg, CancellationToken ct = default) { return Register(reg, WriteOptions.Default, ct); } @@ -112,7 +112,7 @@ internal Catalog(ConsulClient c) /// A catalog registration /// Customized write options /// An empty write result - public Task Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken)) + public Task Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default) { return _client.Put("/v1/catalog/register", reg, q).Execute(ct); } @@ -122,7 +122,7 @@ internal Catalog(ConsulClient c) /// /// A catalog deregistration /// An empty write result - public Task Deregister(CatalogDeregistration reg, CancellationToken ct = default(CancellationToken)) + public Task Deregister(CatalogDeregistration reg, CancellationToken ct = default) { return Deregister(reg, WriteOptions.Default, ct); } @@ -133,7 +133,7 @@ internal Catalog(ConsulClient c) /// A catalog deregistration /// Customized write options /// An empty write result - public Task Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken)) + public Task Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default) { return _client.Put("/v1/catalog/deregister", reg, q).Execute(ct); } @@ -142,16 +142,25 @@ internal Catalog(ConsulClient c) /// Datacenters is used to query for all the known datacenters /// /// A list of datacenter names - public Task> Datacenters(CancellationToken ct = default(CancellationToken)) + public Task> Datacenters(CancellationToken ct = default) { return _client.Get("/v1/catalog/datacenters").Execute(ct); } + /// + /// Datacenters is used to query for all the known datacenters with custom query options + /// + /// A list of datacenter names + public Task> Datacenters(QueryOptions q, CancellationToken ct = default) + { + return _client.Get("/v1/catalog/datacenters", q).Execute(ct); + } + /// /// Nodes is used to query all the known nodes /// /// A list of all nodes - public Task> Nodes(CancellationToken ct = default(CancellationToken)) + public Task> Nodes(CancellationToken ct = default) { return Nodes(QueryOptions.Default, ct); } @@ -162,7 +171,7 @@ internal Catalog(ConsulClient c) /// Customized query options /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// A list of all nodes - public Task> Nodes(QueryOptions q, CancellationToken ct = default(CancellationToken)) + public Task> Nodes(QueryOptions q, CancellationToken ct = default) { return _client.Get("/v1/catalog/nodes", q).Execute(ct); } @@ -171,7 +180,7 @@ internal Catalog(ConsulClient c) /// Services is used to query for all known services /// /// A list of all services - public Task>> Services(CancellationToken ct = default(CancellationToken)) + public Task>> Services(CancellationToken ct = default) { return Services(QueryOptions.Default, ct); } @@ -182,7 +191,7 @@ internal Catalog(ConsulClient c) /// Customized query options /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// A list of all services - public Task>> Services(QueryOptions q, CancellationToken ct = default(CancellationToken)) + public Task>> Services(QueryOptions q, CancellationToken ct = default) { return _client.Get>("/v1/catalog/services", q).Execute(ct); } @@ -193,7 +202,7 @@ internal Catalog(ConsulClient c) /// The service ID /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// A list of service instances - public Task> Service(string service, CancellationToken ct = default(CancellationToken)) + public Task> Service(string service, CancellationToken ct = default) { return Service(service, string.Empty, QueryOptions.Default, ct); } @@ -205,7 +214,7 @@ internal Catalog(ConsulClient c) /// A tag to filter on /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// A list of service instances - public Task> Service(string service, string tag, CancellationToken ct = default(CancellationToken)) + public Task> Service(string service, string tag, CancellationToken ct = default) { return Service(service, tag, QueryOptions.Default, ct); } @@ -234,7 +243,7 @@ public Task> Service(string service, string tag, Q /// The node name /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// The node information including a list of services - public Task> Node(string node, CancellationToken ct = default(CancellationToken)) + public Task> Node(string node, CancellationToken ct = default) { return Node(node, QueryOptions.Default, ct); } @@ -246,7 +255,7 @@ public Task> Service(string service, string tag, Q /// Customized query options /// Cancellation token for long poll request. If set, OperationCanceledException will be thrown if the request is cancelled before completing /// The node information including a list of services - public Task> Node(string node, QueryOptions q, CancellationToken ct = default(CancellationToken)) + public Task> Node(string node, QueryOptions q, CancellationToken ct = default) { return _client.Get(string.Format("/v1/catalog/node/{0}", node), q).Execute(ct); } @@ -259,9 +268,6 @@ public partial class ConsulClient : IConsulClient /// /// Catalog returns a handle to the catalog endpoints /// - public ICatalogEndpoint Catalog - { - get { return _catalog.Value; } - } + public ICatalogEndpoint Catalog => _catalog.Value; } } diff --git a/Consul/Client_GetRequests.cs b/Consul/Client_GetRequests.cs index 50fe83687..d15cb2a0c 100644 --- a/Consul/Client_GetRequests.cs +++ b/Consul/Client_GetRequests.cs @@ -17,6 +17,7 @@ // ----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; @@ -53,7 +54,7 @@ public GetRequest(ConsulClient client, string url, QueryOptions options, IEncoda { if (string.IsNullOrEmpty(url)) { - throw new ArgumentException(nameof(url)); + throw new ArgumentException(null, nameof(url)); } Options = options ?? QueryOptions.Default; Filter = filter; @@ -116,7 +117,7 @@ public async Task> ExecuteStreaming(CancellationToken ct) ApplyHeaders(message, Client.Config); var response = await Client.HttpClient.SendAsync(message, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); - ParseQueryHeaders(response, (result as QueryResult)); + ParseQueryHeaders(response, result as QueryResult); result.StatusCode = response.StatusCode; ResponseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); @@ -124,8 +125,7 @@ public async Task> ExecuteStreaming(CancellationToken ct) if (response.StatusCode != HttpStatusCode.NotFound && !response.IsSuccessStatusCode) { - throw new ConsulRequestException(string.Format("Unexpected response, status code {0}", - response.StatusCode), response.StatusCode); + throw new ConsulRequestException($"Unexpected response, status code {response.StatusCode}", response.StatusCode); } result.RequestTime = timer.Elapsed; @@ -171,6 +171,27 @@ protected override void ApplyOptions(ConsulClientConfiguration clientConfig) { Params["near"] = Options.Near; } + + if (Options.UseCache) + { + Params["cached"] = string.Empty; + var cacheControl = new List(); + + if (Options.MaxAge.HasValue) + { + cacheControl.Add($"max-age={Convert.ToUInt64(Options.MaxAge.Value.TotalSeconds)}"); + } + + if (Options.StaleIfError.HasValue) + { + cacheControl.Add($"stale-if-error={Convert.ToUInt64(Options.StaleIfError.Value.TotalSeconds)}"); + } + + if (cacheControl.Count > 0) + { + Params["Cache-Control"] = string.Join(",", cacheControl); + } + } } protected void ParseQueryHeaders(HttpResponseMessage res, QueryResult meta) @@ -224,6 +245,31 @@ protected void ParseQueryHeaders(HttpResponseMessage res, QueryResult meta throw new ConsulRequestException("Failed to parse X-Consul-Translate-Addresses", res.StatusCode, ex); } } + + if (headers.Contains("X-Cache")) + { + try + { + + meta.XCache = headers.GetValues("X-Cache").Single() == "HIT" ? QueryResult.CacheResult.Hit : QueryResult.CacheResult.Miss; + } + catch (Exception ex) + { + throw new ConsulRequestException("Failed to parse X-Cache", res.StatusCode, ex); + } + } + + if (headers.Contains("Age")) + { + try + { + meta.Age = TimeSpan.FromSeconds(double.Parse(headers.GetValues("Age").Single())); + } + catch (Exception ex) + { + throw new ConsulRequestException("Failed to parse Age", res.StatusCode, ex); + } + } } protected override void ApplyHeaders(HttpRequestMessage message, ConsulClientConfiguration clientConfig) @@ -246,7 +292,7 @@ public GetRequest(ConsulClient client, string url, QueryOptions options = null) { if (string.IsNullOrEmpty(url)) { - throw new ArgumentException(nameof(url)); + throw new ArgumentException(null, nameof(url)); } Options = options ?? QueryOptions.Default; } @@ -306,6 +352,27 @@ protected override void ApplyOptions(ConsulClientConfiguration clientConfig) { Params["dc"] = Options.Datacenter; } + + if (Options.UseCache) + { + Params["cached"] = string.Empty; + var cacheControl = new List(); + + if (Options.MaxAge.HasValue) + { + cacheControl.Add($"max-age={Convert.ToUInt64(Options.MaxAge.Value.TotalSeconds)}"); + } + + if (Options.StaleIfError.HasValue) + { + cacheControl.Add($"stale-if-error={Convert.ToUInt64(Options.StaleIfError.Value.TotalSeconds)}"); + } + + if (cacheControl.Count > 0) + { + Params["Cache-Control"] = string.Join(",", cacheControl); + } + } } protected override void ApplyHeaders(HttpRequestMessage message, ConsulClientConfiguration clientConfig) diff --git a/Consul/Client_Options.cs b/Consul/Client_Options.cs index 5d3393957..ebd5e72dc 100644 --- a/Consul/Client_Options.cs +++ b/Consul/Client_Options.cs @@ -38,15 +38,39 @@ public class QueryOptions }; /// - /// Providing a datacenter overwrites the DC provided by the Config + /// Providing a datacenter overwrites the DC provided by the Config. /// public string Datacenter { get; set; } /// - /// The consistency level required for the operation + /// The consistency level required for the operation. /// public ConsistencyMode Consistency { get; set; } + /// + /// UseCache requests that the agent cache results locally. + /// See https://www.consul.io/api/features/caching.html for more details on the semantics. + /// + public bool UseCache { get; set; } + + /// + /// MaxAge limits how old a cached value will be returned if UseCache is true. + /// If there is a cached response that is older than the MaxAge, it is treated as a cache miss and a new fetch invoked. + /// If the fetch fails, the error is returned. + /// Clients that wish to allow for stale results on error can set StaleIfError to a longer duration to change this behavior. + /// It is ignored if the endpoint supports background refresh caching. + /// See https://www.consul.io/api/features/caching.html for more details. + /// + public TimeSpan? MaxAge { get; set; } + + /// + /// StaleIfError specifies how stale the client will accept a cached response if the servers are unavailable to fetch a fresh one. + /// Only makes sense when UseCache is true and MaxAge is set to a lower, non-zero value. + /// It is ignored if the endpoint supports background refresh caching. + /// See https://www.consul.io/api/features/caching.html for more details. + /// + public TimeSpan? StaleIfError { get; set; } + /// /// WaitIndex is used to enable a blocking query. Waits until the timeout or the next index is reached /// diff --git a/Consul/Client_Results.cs b/Consul/Client_Results.cs index bec8f8c42..e77875eaf 100644 --- a/Consul/Client_Results.cs +++ b/Consul/Client_Results.cs @@ -44,6 +44,23 @@ public ConsulResult(ConsulResult other) /// public class QueryResult : ConsulResult { + public enum CacheResult + { + Miss, + Hit + } + + /// + /// In all cases the HTTP `X-Cache` header is always set in the response to either `HIT` or `MISS` indicating whether the response was served from cache or not. + /// + public CacheResult? XCache { get; set; } + + /// + /// For cache hits, the HTTP `Age` header is always set in the response to indicate how many seconds since that response was fetched from the servers. + /// As long as the local agent has an active connection to the servers, the age will always be 0 since the value is up-to-date. + /// + public TimeSpan Age { get; set; } + /// /// The index number when the query was serviced. This can be used as a WaitIndex to perform a blocking query /// diff --git a/Consul/Interfaces/ICatalogEndpoint.cs b/Consul/Interfaces/ICatalogEndpoint.cs index ce9bb3021..1b71bb5dd 100644 --- a/Consul/Interfaces/ICatalogEndpoint.cs +++ b/Consul/Interfaces/ICatalogEndpoint.cs @@ -29,19 +29,20 @@ namespace Consul /// public interface ICatalogEndpoint { - Task> Datacenters(CancellationToken ct = default(CancellationToken)); - Task Deregister(CatalogDeregistration reg, CancellationToken ct = default(CancellationToken)); - Task Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken)); - Task> Node(string node, CancellationToken ct = default(CancellationToken)); - Task> Node(string node, QueryOptions q, CancellationToken ct = default(CancellationToken)); - Task> Nodes(CancellationToken ct = default(CancellationToken)); - Task> Nodes(QueryOptions q, CancellationToken ct = default(CancellationToken)); - Task Register(CatalogRegistration reg, CancellationToken ct = default(CancellationToken)); - Task Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default(CancellationToken)); - Task> Service(string service, CancellationToken ct = default(CancellationToken)); - Task> Service(string service, string tag, CancellationToken ct = default(CancellationToken)); - Task> Service(string service, string tag, QueryOptions q, CancellationToken ct = default(CancellationToken)); - Task>> Services(CancellationToken ct = default(CancellationToken)); - Task>> Services(QueryOptions q, CancellationToken ct = default(CancellationToken)); + Task> Datacenters(CancellationToken ct = default); + Task> Datacenters(QueryOptions q, CancellationToken ct = default); + Task Deregister(CatalogDeregistration reg, CancellationToken ct = default); + Task Deregister(CatalogDeregistration reg, WriteOptions q, CancellationToken ct = default); + Task> Node(string node, CancellationToken ct = default); + Task> Node(string node, QueryOptions q, CancellationToken ct = default); + Task> Nodes(CancellationToken ct = default); + Task> Nodes(QueryOptions q, CancellationToken ct = default); + Task Register(CatalogRegistration reg, CancellationToken ct = default); + Task Register(CatalogRegistration reg, WriteOptions q, CancellationToken ct = default); + Task> Service(string service, CancellationToken ct = default); + Task> Service(string service, string tag, CancellationToken ct = default); + Task> Service(string service, string tag, QueryOptions q, CancellationToken ct = default); + Task>> Services(CancellationToken ct = default); + Task>> Services(QueryOptions q, CancellationToken ct = default); } } diff --git a/Consul/Lock.cs b/Consul/Lock.cs index 186278bc3..94d0f6494 100644 --- a/Consul/Lock.cs +++ b/Consul/Lock.cs @@ -614,7 +614,7 @@ public TimeSpan LockRetryTime /// /// When set to false, will block forever until the lock is acquired. LockWaitTime is ignored in this case. ///
- /// When set to true, the lock within a timestamp (It is analogous to SemaphoreSlim.Wait(Timespan timeout). + /// When set to true, the lock within a timestamp (It is analogous to SemaphoreSlim.Wait(Timespan timeout). /// Under the hood, it attempts to acquire the lock multiple times if needed (due to the HTTP Long Poll returning early), /// and will do so as many times as it can within the bounds set by LockWaitTime. /// If LockWaitTime is set to 0, there will be only single attempt to acquire the lock.