Skip to content

Commit

Permalink
Fix edgehub_queue_len counting (Azure#5692)
Browse files Browse the repository at this point in the history
This fixes the count of eh_queue_len by using only the metrics as counters and not setting through other means. We will only increment on propose and decrement on commit. We initialize the count based on how many entries are in the queue from the checkpoint's offset. We still update the count whenever we cleanup the queue.

NOTE
Below graph is with TTL of 7200 seconds and the change in color is when edgeHub was restarted.
  • Loading branch information
nyanzebra authored Oct 20, 2021
1 parent 676a0f5 commit d3f6498
Show file tree
Hide file tree
Showing 21 changed files with 236 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public IMessageIterator GetMessageIterator(string endpointId, long startingOffse

public IMessageIterator GetMessageIterator(string endpointId) => this.GetMessageIterator(endpointId, DefaultStartingOffset);

public Task<ulong> GetMessageCountFromOffset(string endpointId, long offset)
{
if (!this.endpointSequentialStores.TryGetValue(Preconditions.CheckNonWhiteSpace(endpointId, nameof(endpointId)), out ISequentialStore<MessageRef> sequentialStore))
{
throw new InvalidOperationException($"Endpoint {nameof(endpointId)} not found");
}

return sequentialStore.GetCountFromOffset(offset);
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -360,8 +370,10 @@ await message.ForEachAsync(async msg =>
}
}

// update Metrics for message counts
Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority.ToString());
// Since we will have the total *true* count of messages in the store,
// we need to set the counter here to the total count of messages in the store.
Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority);

totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,10 @@ public interface IMessageStore : IDisposable
/// Set the expiry time for messages in the store
/// </summary>
void SetTimeToLive(TimeSpan timeToLive);

/// <summary>
/// Returns the number of messages in the store from a offset
/// </summary>
Task<ulong> GetMessageCountFromOffset(string endpointId, long offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public class Checkpointer : ICheckpointer
this.Proposed = checkpointData.Offset;
this.closed = new AtomicBoolean(false);
this.EndpointId = endpointId;
this.Priority = priority.ToString();
this.Priority = priority;
}

public string Id { get; }

public string EndpointId { get; }

public string Priority { get; }
public uint Priority { get; }

public long Offset { get; private set; }

Expand Down Expand Up @@ -71,7 +71,7 @@ public static async Task<Checkpointer> CreateAsync(string id, ICheckpointStore s
public void Propose(IMessage message)
{
this.Proposed = Math.Max(message.Offset, this.Proposed);
Metrics.SetQueueLength(this);
Metrics.IncrementQueueLength(this.EndpointId, this.Priority);
}

public bool Admit(IMessage message)
Expand Down Expand Up @@ -111,7 +111,11 @@ public async Task CommitAsync(ICollection<IMessage> successful, ICollection<IMes
this.LastFailedRevivalTime = lastFailedRevivalTime;
this.UnhealthySince = unhealthySince;
await this.store.SetCheckpointDataAsync(this.Id, new CheckpointData(offset, this.LastFailedRevivalTime, this.UnhealthySince), token);
Metrics.SetQueueLength(this);
}

foreach (var message in successful)
{
Metrics.DecrementQueueLength(this.EndpointId, this.Priority);
}

Events.CommitFinished(this);
Expand Down Expand Up @@ -187,7 +191,7 @@ public static void CommitFinished(Checkpointer checkpointer)

public static void Close(Checkpointer checkpointer)
{
Log.LogInformation((int)EventIds.Close, "[CheckpointerClose] {conetxt}", GetContextString(checkpointer));
Log.LogInformation((int)EventIds.Close, "[CheckpointerClose] {context}", GetContextString(checkpointer));
}

static string GetContextString(Checkpointer checkpointer)
Expand All @@ -203,13 +207,13 @@ public static class Metrics
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });

public static void SetQueueLength(Checkpointer checkpointer) => SetQueueLength(CalculateQueueLength(checkpointer), checkpointer.EndpointId, checkpointer.Priority);
public static void IncrementQueueLength(string endpointId, uint priority) => QueueLength.Increment(new[] { endpointId, priority.ToString(), bool.TrueString });

public static void SetQueueLength(double length, string endpointId, string priority) => QueueLength.Set(length, new[] { endpointId, priority, bool.TrueString });
public static void DecrementQueueLength(string endpointId, uint priority) => QueueLength.Decrement(new[] { endpointId, priority.ToString(), bool.TrueString });

private static double CalculateQueueLength(Checkpointer checkpointer) => CalculateQueueLength(checkpointer.Proposed - checkpointer.Offset);
public static void SetQueueLength(ulong count, string endpointId, uint priority) => QueueLength.Set(count, new[] { endpointId, priority.ToString(), bool.TrueString });

private static double CalculateQueueLength(long length) => Math.Max(length, 0);
public static double GetQueueLength(string endpointId, uint priority) => QueueLength.Get(new[] { endpointId, priority.ToString(), bool.TrueString });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.Metrics;
using Microsoft.Azure.Devices.Routing.Core.Checkpointers;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
Expand Down Expand Up @@ -164,6 +165,17 @@ public async Task UpdatePriorities(IList<uint> priorities, Option<Endpoint> newE
ICheckpointer checkpointer = await this.checkpointerFactory.CreateAsync(id, this.Endpoint.Id, priority);
EndpointExecutorFsm fsm = new EndpointExecutorFsm(this.Endpoint, checkpointer, this.config);

// Get current known count from offset and initialize the queue length count.
// It is possible that there is no checkpoint for this priority yet, in which case
// the value will be InvalidOffset, so we need to initialize the queue length to 0 in that case.
var count = 0ul;
if (checkpointer.Offset != Checkpointer.InvalidOffset)
{
count = await this.messageStore.GetMessageCountFromOffset(id, checkpointer.Offset);
}

Checkpointer.Metrics.SetQueueLength(count, this.Endpoint.Id, priority);

// Add it to our dictionary
updatedSnapshot.Add(priority, fsm);
}
Expand Down
Loading

0 comments on commit d3f6498

Please sign in to comment.