From d3f6498860911cbbee2f3f71bf91b821c9a89e01 Mon Sep 17 00:00:00 2001 From: Robert T Jang Date: Wed, 20 Oct 2021 15:03:00 -0700 Subject: [PATCH] Fix edgehub_queue_len counting (#5692) This fixes the count of eh_queue_len by using only the metrics as counters and not setting through other means. We will only increment on propose and decrement on commit. We initialize the count based on how many entries are in the queue from the checkpoint's offset. We still update the count whenever we cleanup the queue. NOTE Below graph is with TTL of 7200 seconds and the change in color is when edgeHub was restarted. --- .../storage/MessageStore.cs | 16 +- .../IMessageStore.cs | 5 + .../checkpointers/Checkpointer.cs | 22 ++- .../endpoints/StoringAsyncEndpointExecutor.cs | 12 ++ .../StoringAsyncEndpointExecutorTest.cs | 180 +++++++++--------- .../ColumnFamilyDbStore.cs | 24 ++- .../DbStoreDecorator.cs | 2 + .../EncryptedStore.cs | 2 + .../EntityStore.cs | 2 + .../IKeyValueStore.cs | 2 + .../ISequentialStore.cs | 2 + .../InMemoryDbStore.cs | 2 + .../KeyValueStoreMapper.cs | 2 + .../NullKeyValueStore.cs | 2 + .../SequentialStore.cs | 2 + .../TimedEntityStore.cs | 2 + .../metrics/IMetricsGauge.cs | 6 + .../metrics/appMetrics/MetricsGauge.cs | 8 + .../metrics/nullMetrics/NullMetricsGauge.cs | 10 + .../metrics/prometheus.net/MetricsGauge.cs | 6 + .../ColumnFamilyStoreTest.cs | 31 +++ 21 files changed, 236 insertions(+), 104 deletions(-) diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs index c0a632a92bc..fd1f032c46f 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs @@ -134,6 +134,16 @@ public IMessageIterator GetMessageIterator(string endpointId, long startingOffse public IMessageIterator GetMessageIterator(string endpointId) => this.GetMessageIterator(endpointId, DefaultStartingOffset); + public Task GetMessageCountFromOffset(string endpointId, long offset) + { + if (!this.endpointSequentialStores.TryGetValue(Preconditions.CheckNonWhiteSpace(endpointId, nameof(endpointId)), out ISequentialStore sequentialStore)) + { + throw new InvalidOperationException($"Endpoint {nameof(endpointId)} not found"); + } + + return sequentialStore.GetCountFromOffset(offset); + } + public void Dispose() { this.Dispose(true); @@ -360,8 +370,10 @@ await message.ForEachAsync(async msg => } } - // update Metrics for message counts - Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority.ToString()); + // Since we will have the total *true* count of messages in the store, + // we need to set the counter here to the total count of messages in the store. + Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority); + totalCleanupCount += cleanupCount; totalCleanupStoreCount += cleanupEntityStoreCount; Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount); diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs index dde78c795b4..4f06a598da6 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs @@ -46,5 +46,10 @@ public interface IMessageStore : IDisposable /// Set the expiry time for messages in the store /// void SetTimeToLive(TimeSpan timeToLive); + + /// + /// Returns the number of messages in the store from a offset + /// + Task GetMessageCountFromOffset(string endpointId, long offset); } } diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs index 388b81a4a9f..895daf5ad40 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs @@ -33,14 +33,14 @@ public class Checkpointer : ICheckpointer this.Proposed = checkpointData.Offset; this.closed = new AtomicBoolean(false); this.EndpointId = endpointId; - this.Priority = priority.ToString(); + this.Priority = priority; } public string Id { get; } public string EndpointId { get; } - public string Priority { get; } + public uint Priority { get; } public long Offset { get; private set; } @@ -71,7 +71,7 @@ public static async Task CreateAsync(string id, ICheckpointStore s public void Propose(IMessage message) { this.Proposed = Math.Max(message.Offset, this.Proposed); - Metrics.SetQueueLength(this); + Metrics.IncrementQueueLength(this.EndpointId, this.Priority); } public bool Admit(IMessage message) @@ -111,7 +111,11 @@ public async Task CommitAsync(ICollection successful, ICollection { "endpoint", "priority", MetricsConstants.MsTelemetry }); - public static void SetQueueLength(Checkpointer checkpointer) => SetQueueLength(CalculateQueueLength(checkpointer), checkpointer.EndpointId, checkpointer.Priority); + public static void IncrementQueueLength(string endpointId, uint priority) => QueueLength.Increment(new[] { endpointId, priority.ToString(), bool.TrueString }); - public static void SetQueueLength(double length, string endpointId, string priority) => QueueLength.Set(length, new[] { endpointId, priority, bool.TrueString }); + public static void DecrementQueueLength(string endpointId, uint priority) => QueueLength.Decrement(new[] { endpointId, priority.ToString(), bool.TrueString }); - private static double CalculateQueueLength(Checkpointer checkpointer) => CalculateQueueLength(checkpointer.Proposed - checkpointer.Offset); + public static void SetQueueLength(ulong count, string endpointId, uint priority) => QueueLength.Set(count, new[] { endpointId, priority.ToString(), bool.TrueString }); - private static double CalculateQueueLength(long length) => Math.Max(length, 0); + public static double GetQueueLength(string endpointId, uint priority) => QueueLength.Get(new[] { endpointId, priority.ToString(), bool.TrueString }); } } } diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs index bf39324e357..75da4123baa 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Concurrency; using Microsoft.Azure.Devices.Edge.Util.Metrics; + using Microsoft.Azure.Devices.Routing.Core.Checkpointers; using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine; using Microsoft.Extensions.Logging; using Nito.AsyncEx; @@ -164,6 +165,17 @@ public async Task UpdatePriorities(IList priorities, Option newE ICheckpointer checkpointer = await this.checkpointerFactory.CreateAsync(id, this.Endpoint.Id, priority); EndpointExecutorFsm fsm = new EndpointExecutorFsm(this.Endpoint, checkpointer, this.config); + // Get current known count from offset and initialize the queue length count. + // It is possible that there is no checkpoint for this priority yet, in which case + // the value will be InvalidOffset, so we need to initialize the queue length to 0 in that case. + var count = 0ul; + if (checkpointer.Offset != Checkpointer.InvalidOffset) + { + count = await this.messageStore.GetMessageCountFromOffset(id, checkpointer.Offset); + } + + Checkpointer.Metrics.SetQueueLength(count, this.Endpoint.Id, priority); + // Add it to our dictionary updatedSnapshot.Add(priority, fsm); } diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs index fe88839e7a1..4c538a73482 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs @@ -6,8 +6,8 @@ namespace Microsoft.Azure.Devices.Routing.Core.Test.Endpoints using System.Collections.Generic; using System.Linq; using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Devices.Edge.Util; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Concurrency; using Microsoft.Azure.Devices.Edge.Util.Test.Common; using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling; @@ -337,64 +337,64 @@ public async Task MessagePrioritiesTest() var storingAsyncEndpointExecutor = new StoringAsyncEndpointExecutor(endpoint, checkpointerFactory, endpointExecutorConfig, asyncEndpointExecutorOptions, messageStore); await storingAsyncEndpointExecutor.UpdatePriorities(priorities, Option.None()); - var normalPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 1 }, new Dictionary { { "normalPriority", string.Empty } }, 0L); - var normalPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 2 }, new Dictionary { { "normalPriority", string.Empty } }, 1L); - var normalPriMsg3 = new Message(TelemetryMessageSource.Instance, new byte[] { 3 }, new Dictionary { { "normalPriority", string.Empty } }, 2L); - var lowPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 4 }, new Dictionary { { "lowPriority", string.Empty } }, 3L); - var lowPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 5 }, new Dictionary { { "lowPriority", string.Empty } }, 4L); - var highPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 6 }, new Dictionary { { "highPriority", string.Empty } }, 5L); - var normalPriMsg4 = new Message(TelemetryMessageSource.Instance, new byte[] { 7 }, new Dictionary { { "normalPriority", string.Empty } }, 6L); - var highPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 8 }, new Dictionary { { "highPriority", string.Empty } }, 7L); - const int HighPriCount = 2; - const int NormalPriCount = 4; - const int LowPriCount = 2; + var normalPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 1 }, new Dictionary { { "normalPriority", string.Empty } }, 0L); + var normalPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 2 }, new Dictionary { { "normalPriority", string.Empty } }, 1L); + var normalPriMsg3 = new Message(TelemetryMessageSource.Instance, new byte[] { 3 }, new Dictionary { { "normalPriority", string.Empty } }, 2L); + var lowPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 4 }, new Dictionary { { "lowPriority", string.Empty } }, 3L); + var lowPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 5 }, new Dictionary { { "lowPriority", string.Empty } }, 4L); + var highPriMsg1 = new Message(TelemetryMessageSource.Instance, new byte[] { 6 }, new Dictionary { { "highPriority", string.Empty } }, 5L); + var normalPriMsg4 = new Message(TelemetryMessageSource.Instance, new byte[] { 7 }, new Dictionary { { "normalPriority", string.Empty } }, 6L); + var highPriMsg2 = new Message(TelemetryMessageSource.Instance, new byte[] { 8 }, new Dictionary { { "highPriority", string.Empty } }, 7L); + const int HighPriCount = 2; + const int NormalPriCount = 4; + const int LowPriCount = 2; // Disable the endpoint so messages are stuck in queue endpoint.CanProcess = false; - // Send normal priority messages - await storingAsyncEndpointExecutor.Invoke(normalPriMsg1, NormalPri, 3600); - await storingAsyncEndpointExecutor.Invoke(normalPriMsg2, NormalPri, 3600); - await storingAsyncEndpointExecutor.Invoke(normalPriMsg3, NormalPri, 3600); - - // Send low priority messages - await storingAsyncEndpointExecutor.Invoke(lowPriMsg1, LowPri, 3600); - await storingAsyncEndpointExecutor.Invoke(lowPriMsg2, LowPri, 3600); - - // Send the remaining messages mixed priority - await storingAsyncEndpointExecutor.Invoke(highPriMsg1, HighPri, 3600); - await storingAsyncEndpointExecutor.Invoke(normalPriMsg4, NormalPri, 3600); - await storingAsyncEndpointExecutor.Invoke(highPriMsg2, HighPri, 3600); - - // Message store should have the messages in the corresponding queues - var highPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{HighPri}"); - Assert.Equal(2, highPriQueue.Count); - Assert.Contains(highPriMsg1, highPriQueue); - Assert.Contains(highPriMsg2, highPriQueue); - - var normalPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{NormalPri}"); - Assert.Equal(4, normalPriQueue.Count); - Assert.Contains(normalPriMsg1, normalPriQueue); - Assert.Contains(normalPriMsg2, normalPriQueue); - Assert.Contains(normalPriMsg3, normalPriQueue); - Assert.Contains(normalPriMsg4, normalPriQueue); - - var lowPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{LowPri}"); - Assert.Equal(2, lowPriQueue.Count); - Assert.Contains(lowPriMsg1, lowPriQueue); - Assert.Contains(lowPriMsg2, lowPriQueue); - - // Re-enable the endpoint and let the queues drain + // Send normal priority messages + await storingAsyncEndpointExecutor.Invoke(normalPriMsg1, NormalPri, 3600); + await storingAsyncEndpointExecutor.Invoke(normalPriMsg2, NormalPri, 3600); + await storingAsyncEndpointExecutor.Invoke(normalPriMsg3, NormalPri, 3600); + + // Send low priority messages + await storingAsyncEndpointExecutor.Invoke(lowPriMsg1, LowPri, 3600); + await storingAsyncEndpointExecutor.Invoke(lowPriMsg2, LowPri, 3600); + + // Send the remaining messages mixed priority + await storingAsyncEndpointExecutor.Invoke(highPriMsg1, HighPri, 3600); + await storingAsyncEndpointExecutor.Invoke(normalPriMsg4, NormalPri, 3600); + await storingAsyncEndpointExecutor.Invoke(highPriMsg2, HighPri, 3600); + + // Message store should have the messages in the corresponding queues + var highPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{HighPri}"); + Assert.Equal(2, highPriQueue.Count); + Assert.Contains(highPriMsg1, highPriQueue); + Assert.Contains(highPriMsg2, highPriQueue); + + var normalPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{NormalPri}"); + Assert.Equal(4, normalPriQueue.Count); + Assert.Contains(normalPriMsg1, normalPriQueue); + Assert.Contains(normalPriMsg2, normalPriQueue); + Assert.Contains(normalPriMsg3, normalPriQueue); + Assert.Contains(normalPriMsg4, normalPriQueue); + + var lowPriQueue = messageStore.GetReceivedMessagesForEndpoint($"{endpoint.Id}_Pri{LowPri}"); + Assert.Equal(2, lowPriQueue.Count); + Assert.Contains(lowPriMsg1, lowPriQueue); + Assert.Contains(lowPriMsg2, lowPriQueue); + + // Re-enable the endpoint and let the queues drain endpoint.CanProcess = true; int retryAttempts = 0; int count = endpoint.Processed.Count(); - while (count != 8) - { - Assert.True(count < 8); - await Task.Delay(TimeSpan.FromSeconds(3)); - retryAttempts++; - Assert.True(retryAttempts < 8, "Too many retry attempts. Failed because test is taking too long."); - count = endpoint.Processed.Count(); + while (count != 8) + { + Assert.True(count < 8); + await Task.Delay(TimeSpan.FromSeconds(3)); + retryAttempts++; + Assert.True(retryAttempts < 8, "Too many retry attempts. Failed because test is taking too long."); + count = endpoint.Processed.Count(); } // Assert - Make sure the endpoint received all the messages @@ -408,45 +408,45 @@ public async Task MessagePrioritiesTest() for (int i = 0; i < endpoint.Processed.Count(); i++) { IMessage message = endpoint.Processed[i]; - if (message.Properties.ContainsKey($"highPriority")) - { - if (++highPriMessagesProcessed == HighPriCount) - { - // Found all the high-pri messages, - // normal and low pri at this point - // must not have completed yet - Assert.True(normalPriMessagesProcessed < NormalPriCount); - Assert.True(lowPriMessagesProcessed < LowPriCount); - } + if (message.Properties.ContainsKey($"highPriority")) + { + if (++highPriMessagesProcessed == HighPriCount) + { + // Found all the high-pri messages, + // normal and low pri at this point + // must not have completed yet + Assert.True(normalPriMessagesProcessed < NormalPriCount); + Assert.True(lowPriMessagesProcessed < LowPriCount); + } } - else if (message.Properties.ContainsKey($"normalPriority")) - { - if (++normalPriMessagesProcessed == NormalPriCount) - { - // Found all the normal-pri messages, - // low pri messages at this point must - // not have completed yet - Assert.True(lowPriMessagesProcessed < LowPriCount); - - // High pri messages should have completed - Assert.True(highPriMessagesProcessed == HighPriCount); - } + else if (message.Properties.ContainsKey($"normalPriority")) + { + if (++normalPriMessagesProcessed == NormalPriCount) + { + // Found all the normal-pri messages, + // low pri messages at this point must + // not have completed yet + Assert.True(lowPriMessagesProcessed < LowPriCount); + + // High pri messages should have completed + Assert.True(highPriMessagesProcessed == HighPriCount); + } } - else if (message.Properties.ContainsKey($"lowPriority")) - { - if (++lowPriMessagesProcessed == LowPriCount) - { - // Found all the low-pri messages, - // high-pri and normal-pri should also - // have completed before this - Assert.True(highPriMessagesProcessed == HighPriCount); - Assert.True(normalPriMessagesProcessed == NormalPriCount); - } + else if (message.Properties.ContainsKey($"lowPriority")) + { + if (++lowPriMessagesProcessed == LowPriCount) + { + // Found all the low-pri messages, + // high-pri and normal-pri should also + // have completed before this + Assert.True(highPriMessagesProcessed == HighPriCount); + Assert.True(normalPriMessagesProcessed == NormalPriCount); + } } - else - { - // Bad test setup - Assert.True(false, "Bad test setup, processed a message with unexpected priority"); + else + { + // Bad test setup + Assert.True(false, "Bad test setup, processed a message with unexpected priority"); } } } @@ -531,6 +531,8 @@ public Task RemoveEndpoint(string endpointId) public List GetReceivedMessagesForEndpoint(string endpointId) => this.GetQueue(endpointId).Queue; + public Task GetMessageCountFromOffset(string endpointId, long offset) => Task.FromResult(0ul); + TestMessageQueue GetQueue(string endpointId) => this.endpointQueues.GetOrAdd(endpointId, new TestMessageQueue()); class TestMessageQueue : IMessageIterator diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage.RocksDb/ColumnFamilyDbStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage.RocksDb/ColumnFamilyDbStore.cs index cbd3feefd65..21e483f32f2 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage.RocksDb/ColumnFamilyDbStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage.RocksDb/ColumnFamilyDbStore.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb class ColumnFamilyDbStore : IDbStore { readonly IRocksDb db; - private ulong count; + private long count; public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle) { @@ -20,6 +20,7 @@ public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle) this.Handle = Preconditions.CheckNotNull(handle, nameof(handle)); var iterator = db.NewIterator(this.Handle); + iterator.SeekToFirst(); this.count = 0; while (iterator.Valid()) { @@ -65,7 +66,7 @@ public async Task Put(byte[] key, byte[] value, CancellationToken cancellationTo Action operation = () => this.db.Put(key, value, this.Handle); await operation.ExecuteUntilCancelled(cancellationToken); - this.count += 1; + Interlocked.Increment(ref this.count); } public async Task Remove(byte[] key, CancellationToken cancellationToken) @@ -74,7 +75,7 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken) Action operation = () => this.db.Remove(key, this.Handle); await operation.ExecuteUntilCancelled(cancellationToken); - this.count -= 1; + Interlocked.Decrement(ref this.count); } public async Task> GetLastEntry(CancellationToken cancellationToken) @@ -140,7 +141,22 @@ public Task IterateBatch(int batchSize, Func callback, Can return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback, cancellationToken); } - public Task Count() => Task.FromResult(this.count); + public Task Count() => Task.FromResult((ulong)Interlocked.Read(ref this.count)); + + public Task GetCountFromOffset(byte[] offset) + { + var iterator = this.db.NewIterator(this.Handle); + iterator.Seek(offset); + + ulong count = 0; + while (iterator.Valid()) + { + count += 1; + iterator = iterator.Next(); + } + + return Task.FromResult(count); + } public void Dispose() { diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/DbStoreDecorator.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/DbStoreDecorator.cs index eec5098acfe..295fddb3701 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/DbStoreDecorator.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/DbStoreDecorator.cs @@ -104,5 +104,7 @@ public Task Remove(byte[] key, CancellationToken cancellationToken) } public Task Count() => this.dbStore.Count(); + + public Task GetCountFromOffset(byte[] offset) => this.dbStore.GetCountFromOffset(offset); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EncryptedStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EncryptedStore.cs index f7bfee73ef9..8575bf00689 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EncryptedStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EncryptedStore.cs @@ -122,6 +122,8 @@ await decryptedValue.ForEachAsync( public Task Count() => this.entityStore.Count(); + public Task GetCountFromOffset(TK offset) => this.entityStore.GetCountFromOffset(offset); + public void Dispose() { this.Dispose(true); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EntityStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EntityStore.cs index 84ef7e97063..f4999c7c87f 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EntityStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EntityStore.cs @@ -152,6 +152,8 @@ public Task Contains(TK key, CancellationToken cancellationToken) public Task Count() => this.dbStore.Count(); + public Task GetCountFromOffset(TK offset) => this.dbStore.GetCountFromOffset(offset); + public void Dispose() { this.Dispose(true); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/IKeyValueStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/IKeyValueStore.cs index ebfafab4a01..0aa441788c9 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/IKeyValueStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/IKeyValueStore.cs @@ -44,5 +44,7 @@ public interface IKeyValueStore : IDisposable Task IterateBatch(TK startKey, int batchSize, Func perEntityCallback, CancellationToken cancellationToken); Task Count(); + + Task GetCountFromOffset(TK offset); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs index 060f71e66cc..ddd134e7ecf 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs @@ -34,5 +34,7 @@ public interface ISequentialStore : IDisposable Task> GetBatch(long startingOffset, int batchSize, CancellationToken cancellationToken); Task Count(); + + Task GetCountFromOffset(long offset); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/InMemoryDbStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/InMemoryDbStore.cs index d4d79904df2..8a30738b12a 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/InMemoryDbStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/InMemoryDbStore.cs @@ -122,6 +122,8 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken) public Task Count() => Task.FromResult((ulong)this.keyValues.Count); + public Task GetCountFromOffset(byte[] offset) => throw new NotImplementedException(); + public void Dispose() { // No-op diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/KeyValueStoreMapper.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/KeyValueStoreMapper.cs index 2ed233aa243..201f9357f59 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/KeyValueStoreMapper.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/KeyValueStoreMapper.cs @@ -76,6 +76,8 @@ public Task IterateBatch(int batchSize, Func callback, Cancellatio public Task Count() => this.underlyingStore.Count(); + public Task GetCountFromOffset(TK offset) => this.underlyingStore.GetCountFromOffset(this.keyMapper.From(offset)); + Task IterateBatch(Option startKey, int batchSize, Func callback, CancellationToken cancellationToken) { Preconditions.CheckRange(batchSize, 1, nameof(batchSize)); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/NullKeyValueStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/NullKeyValueStore.cs index fc648a3268c..424bec2fad9 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/NullKeyValueStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/NullKeyValueStore.cs @@ -45,5 +45,7 @@ public void Dispose() public Task IterateBatch(TK startKey, int batchSize, Func perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask; public Task Count() => Task.FromResult(0UL); + + public Task GetCountFromOffset(TK offset) => Task.FromResult(0UL); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs index 99390010a15..9103764fd0a 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs @@ -190,6 +190,8 @@ await this.entityStore.IterateBatch( public Task Count() => this.entityStore.Count(); + public Task GetCountFromOffset(long offset) => this.entityStore.GetCountFromOffset(StoreUtils.GetKeyFromOffset(offset)); + public void Dispose() { this.Dispose(true); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/TimedEntityStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/TimedEntityStore.cs index 5ecbeb92a3f..1b38b32f97b 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/TimedEntityStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/TimedEntityStore.cs @@ -131,5 +131,7 @@ public Task IterateBatch(TK startKey, int batchSize, Func perEntit } public Task Count() => this.underlyingKeyValueStore.Count(); + + public Task GetCountFromOffset(TK offset) => this.underlyingKeyValueStore.GetCountFromOffset(offset); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/IMetricsGauge.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/IMetricsGauge.cs index 22de0087f8c..12456f9664d 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/IMetricsGauge.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/IMetricsGauge.cs @@ -3,6 +3,12 @@ namespace Microsoft.Azure.Devices.Edge.Util.Metrics { public interface IMetricsGauge { + double Get(string[] labelValues); + void Set(double value, string[] labelValues); + + void Increment(string[] labelValues); + + void Decrement(string[] labelValues); } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/appMetrics/MetricsGauge.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/appMetrics/MetricsGauge.cs index b395b1ed470..4279bb7d4b1 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/appMetrics/MetricsGauge.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/appMetrics/MetricsGauge.cs @@ -6,6 +6,8 @@ namespace Microsoft.Azure.Devices.Edge.Util.Metrics.AppMetrics using App.Metrics; using App.Metrics.Gauge; + // NOTE: AppMetrics is not used and doesn't support inc/dec on gauge natively. + // We currently only use the prometheus version of gauage. public class MetricsGauge : BaseMetric, IMetricsGauge { readonly IMeasureGaugeMetrics gaugeMetrics; @@ -22,6 +24,12 @@ public MetricsGauge(string name, IMeasureGaugeMetrics gaugeMetrics, List }; } + public void Decrement(string[] labelValues) => throw new System.NotImplementedException(); + + public double Get(string[] labelValues) => throw new System.NotImplementedException(); + + public void Increment(string[] labelValues) => throw new System.NotImplementedException(); + public void Set(double value, string[] labelValues) { var tags = new MetricTags(this.LabelNames, labelValues); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/nullMetrics/NullMetricsGauge.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/nullMetrics/NullMetricsGauge.cs index 08bcebded83..2f04e8bd0b9 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/nullMetrics/NullMetricsGauge.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/nullMetrics/NullMetricsGauge.cs @@ -3,8 +3,18 @@ namespace Microsoft.Azure.Devices.Edge.Util.Metrics.NullMetrics { public class NullMetricsGauge : IMetricsGauge { + public double Get(string[] labelValues) => 0; + public void Set(double value, string[] labelValues) { } + + public void Increment(string[] labelValues) + { + } + + public void Decrement(string[] labelValues) + { + } } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/prometheus.net/MetricsGauge.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/prometheus.net/MetricsGauge.cs index ab97f4b4401..96f7b38e490 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/prometheus.net/MetricsGauge.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/prometheus.net/MetricsGauge.cs @@ -14,9 +14,15 @@ public MetricsGauge(string name, string description, List labelNames, Li this.gauge = Metrics.CreateGauge(name, description, labelNames.ToArray()); } + public double Get(string[] labelValues) => this.gauge.WithLabels(this.GetLabelValues(labelValues)).Value; + public void Set(double value, string[] labelValues) => this.gauge .WithLabels(this.GetLabelValues(labelValues)) .Set(value); + + public void Increment(string[] labelValues) => this.gauge.WithLabels(this.GetLabelValues(labelValues)).Inc(); + + public void Decrement(string[] labelValues) => this.gauge.WithLabels(this.GetLabelValues(labelValues)).Dec(); } } diff --git a/edge-util/test/Microsoft.Azure.Devices.Edge.Storage.RocksDb.Test/ColumnFamilyStoreTest.cs b/edge-util/test/Microsoft.Azure.Devices.Edge.Storage.RocksDb.Test/ColumnFamilyStoreTest.cs index ad68c8cfa89..9baae0a690b 100644 --- a/edge-util/test/Microsoft.Azure.Devices.Edge.Storage.RocksDb.Test/ColumnFamilyStoreTest.cs +++ b/edge-util/test/Microsoft.Azure.Devices.Edge.Storage.RocksDb.Test/ColumnFamilyStoreTest.cs @@ -72,5 +72,36 @@ public async Task FirstLastTest() Assert.Equal(lastKey, lastEntry.key.FromBytes()); Assert.Equal(lastValue, lastEntry.value.FromBytes()); } + + [Fact] + public async Task MessageCountTest() + { + using (IDbStore columnFamilyDbStore = this.rocksDbStoreProvider.GetDbStore("test")) + { + Assert.Equal(0ul, await columnFamilyDbStore.Count()); + + for (int i = 0; i < 10; i++) + { + string key = $"key{i}"; + string value = "$value{i}"; + await columnFamilyDbStore.Put(key.ToBytes(), value.ToBytes()); + } + + Assert.Equal(10ul, await columnFamilyDbStore.Count()); + } + + using (IDbStore columnFamilyDbStore = this.rocksDbStoreProvider.GetDbStore("test")) + { + Assert.Equal(10ul, await columnFamilyDbStore.Count()); + + for (int i = 0; i < 10; i++) + { + string key = $"key{i}"; + await columnFamilyDbStore.Remove(key.ToBytes()); + } + + Assert.Equal(0ul, await columnFamilyDbStore.Count()); + } + } } }