Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/Testing/MetricsTests/is_system_endpoint_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Shouldly;
using Wolverine.Runtime;
using Xunit;

namespace MetricsTests;

public class is_system_endpoint_tests
{
[Theory]
[InlineData("rabbitmq://localhost/wolverine.response.abc123", true)]
[InlineData("rabbitmq://localhost/wolverine.Response.ABC123", true)]
[InlineData("redis://localhost/wolverine.response.node1", true)]
[InlineData("local://replies", true)]
[InlineData("local://durable", true)]
[InlineData("rabbitmq://localhost/my-queue", false)]
[InlineData("tcp://localhost:5000", false)]
public void should_identify_system_endpoints(string uriString, bool expected)
{
var uri = new Uri(uriString);
WolverineRuntime.IsSystemEndpoint(uri).ShouldBe(expected);
}

[Fact]
public void null_uri_is_not_system_endpoint()
{
WolverineRuntime.IsSystemEndpoint(null).ShouldBeFalse();
}
}
126 changes: 126 additions & 0 deletions src/Testing/MetricsTests/sent_and_received_metrics_tracking.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using Shouldly;
using Wolverine.Runtime.Metrics;

namespace MetricsTests;

public class sent_and_received_metrics_tracking
{
[Fact]
public void record_sent_increments_per_tenant_tracking_sent()
{
var tracking = new PerTenantTracking("t1");
tracking.Sent.ShouldBe(0);

var record = new RecordSent("t1", "ServiceA");
record.Apply(tracking);

tracking.Sent.ShouldBe(1);

record.Apply(tracking);
record.Apply(tracking);

tracking.Sent.ShouldBe(3);
}

[Fact]
public void record_received_increments_per_tenant_tracking_received()
{
var tracking = new PerTenantTracking("t1");
tracking.Received.ShouldBe(0);

var record = new RecordReceived("t1", "ServiceB");
record.Apply(tracking);

tracking.Received.ShouldBe(1);

record.Apply(tracking);
record.Apply(tracking);

tracking.Received.ShouldBe(3);
}

[Fact]
public void compile_and_reset_includes_sent_and_received_then_resets()
{
var tracking = new PerTenantTracking("t1");
tracking.Sent = 5;
tracking.Received = 7;

var metrics = tracking.CompileAndReset();

metrics.TenantId.ShouldBe("t1");
metrics.Sent.ShouldBe(5);
metrics.Received.ShouldBe(7);

// After compile and reset, counters should be zero
tracking.Sent.ShouldBe(0);
tracking.Received.ShouldBe(0);
}

[Fact]
public void clear_resets_sent_and_received_to_zero()
{
var tracking = new PerTenantTracking("t1");
tracking.Sent = 10;
tracking.Received = 20;

tracking.Clear();

tracking.Sent.ShouldBe(0);
tracking.Received.ShouldBe(0);
}

[Fact]
public void per_tenant_metrics_sum_aggregates_sent_and_received()
{
var metrics1 = new PerTenantMetrics("t1",
new Executions(1, 100),
new EffectiveTime(1, 50),
Array.Empty<ExceptionCounts>(),
Sent: 3,
Received: 5);

var metrics2 = new PerTenantMetrics("t1",
new Executions(2, 200),
new EffectiveTime(2, 100),
Array.Empty<ExceptionCounts>(),
Sent: 7,
Received: 11);

var metrics3 = new PerTenantMetrics("t1",
new Executions(1, 50),
new EffectiveTime(1, 25),
Array.Empty<ExceptionCounts>(),
Sent: 2,
Received: 4);

var group = new[] { metrics1, metrics2, metrics3 }
.GroupBy(m => m.TenantId)
.Single();

var summed = PerTenantMetrics.Sum(group);

summed.TenantId.ShouldBe("t1");
summed.Sent.ShouldBe(12);
summed.Received.ShouldBe(20);
}

[Fact]
public void per_tenant_metrics_weight_multiplies_sent_and_received()
{
var metrics = new PerTenantMetrics("t1",
new Executions(2, 200),
new EffectiveTime(2, 100),
Array.Empty<ExceptionCounts>(),
Sent: 4,
Received: 6);

var weighted = metrics.Weight(3);

weighted.TenantId.ShouldBe("t1");
weighted.Sent.ShouldBe(12);
weighted.Received.ShouldBe(18);
weighted.Executions.Count.ShouldBe(6);
weighted.Executions.TotalTime.ShouldBe(600);
}
}
3 changes: 2 additions & 1 deletion src/Wolverine/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
[assembly: InternalsVisibleTo("Wolverine.Kafka")]
[assembly: InternalsVisibleTo("Wolverine.Nats")]
[assembly: InternalsVisibleTo("Wolverine.Redis")]
[assembly: InternalsVisibleTo("Wolverine.Pubsub")]
[assembly: InternalsVisibleTo("Wolverine.Pubsub")]
[assembly: InternalsVisibleTo("MetricsTests")]
2 changes: 2 additions & 0 deletions src/Wolverine/MetricsConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class MetricsConstants
public const string MessageTypeKey = "message.type";
public const string MessageDestinationKey = "message.destination";
public const string TenantIdKey = "tenant.id";
public const string MessagesReceived = "wolverine-messages-received";
public const string MessagesFailed = "wolverine-execution-failure";
public const string ExceptionType = "exception.type";
public const string SourceKey = "source";
}
14 changes: 10 additions & 4 deletions src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public MessageHandlingMetrics Weight(int weight)
/// <param name="Executions">Handler execution count and total execution time in milliseconds.</param>
/// <param name="EffectiveTime">Message completion count and total end-to-end time in milliseconds.</param>
/// <param name="Exceptions">Per-exception-type counts of failures and dead letters, ordered alphabetically by type name.</param>
public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions)
/// <param name="Sent">The number of messages sent during this period.</param>
/// <param name="Received">The number of messages received from external transports during this period.</param>
public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions, int Sent = 0, int Received = 0)
{
/// <summary>
/// Sums a group of <see cref="PerTenantMetrics"/> records sharing the same tenant ID.
Expand All @@ -155,13 +157,15 @@ public static PerTenantMetrics Sum(IGrouping<string, PerTenantMetrics> group)
.GroupBy(e => e.ExceptionType)
.Select(ExceptionCounts.Sum)
.ToArray();
var sent = group.Sum(t => t.Sent);
var received = group.Sum(t => t.Received);

return new PerTenantMetrics(tenantId, executions, effectiveTime, exceptions);
return new PerTenantMetrics(tenantId, executions, effectiveTime, exceptions, sent, received);
}

/// <summary>
/// Multiplies all numeric values (execution counts and times, effective time counts and times,
/// and all exception failure/dead-letter counts) by the given weight.
/// sent/received counts, and all exception failure/dead-letter counts) by the given weight.
/// </summary>
/// <param name="weight">The multiplier to apply.</param>
/// <returns>A new weighted copy of this per-tenant snapshot.</returns>
Expand All @@ -170,7 +174,9 @@ public PerTenantMetrics Weight(int weight)
return new PerTenantMetrics(TenantId,
Executions.Weight(weight),
EffectiveTime.Weight(weight),
Exceptions.Select(e => e.Weight(weight)).ToArray());
Exceptions.Select(e => e.Weight(weight)).ToArray(),
Sent * weight,
Received * weight);
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ public void Start()
foreach (var accumulator in _accumulators)
{
var metrics = accumulator.TriggerExport(_runtime.DurabilitySettings.AssignedNodeNumber);

if (metrics.PerTenant.Length > 0)
{
_runtime.Observer.MessageHandlingMetricsExported(metrics);
}
_runtime.Observer.MessageHandlingMetricsExported(metrics);
}
}
catch (Exception e)
Expand Down
16 changes: 15 additions & 1 deletion src/Wolverine/Runtime/Metrics/PerTenantTracking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public PerTenantTracking(string tenantId)
/// </summary>
public double TotalEffectiveTime { get; set; }

/// <summary>
/// The number of messages sent. Incremented by <see cref="RecordSent"/>.
/// </summary>
public int Sent { get; set; }

/// <summary>
/// The number of messages received from external transports. Incremented by <see cref="RecordReceived"/>.
/// </summary>
public int Received { get; set; }

/// <summary>
/// Dead-letter counts keyed by fully-qualified exception type name. Incremented
/// by <see cref="RecordDeadLetter"/>.
Expand Down Expand Up @@ -79,7 +89,9 @@ public PerTenantMetrics CompileAndReset()
Failures.TryGetValue(exceptionType, out failures);

return new ExceptionCounts(exceptionType, failures, deadLetters);
}).ToArray()
}).ToArray(),
Sent,
Received
);

Clear();
Expand All @@ -96,6 +108,8 @@ public void Clear()
TotalExecutionTime = 0;
Completions = 0;
TotalEffectiveTime = 0;
Sent = 0;
Received = 0;
DeadLetterCounts.Clear();
Failures.Clear();
}
Expand Down
18 changes: 18 additions & 0 deletions src/Wolverine/Runtime/Metrics/RecordReceived.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Wolverine.Runtime.Metrics;

/// <summary>
/// Records that a message was received from an external transport. Posted when
/// <c>IMessageTracker.Received()</c> is called for non-local, non-stub transports.
/// Increments <see cref="PerTenantTracking.Received"/> by one. The <see cref="Source"/> field
/// carries the ServiceName of the receiving application for cross-service flow visualization.
/// </summary>
/// <param name="TenantId">The tenant identifier from the envelope.</param>
/// <param name="Source">The ServiceName of the application that received the message.</param>
public record RecordReceived(string TenantId, string Source) : IHandlerMetricsData
{
/// <inheritdoc />
public void Apply(PerTenantTracking tracking)
{
tracking.Received++;
}
}
17 changes: 17 additions & 0 deletions src/Wolverine/Runtime/Metrics/RecordSent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Wolverine.Runtime.Metrics;

/// <summary>
/// Records that a message was sent. Posted when <c>IMessageTracker.Sent()</c> is called.
/// Increments <see cref="PerTenantTracking.Sent"/> by one. The <see cref="Source"/> field
/// carries the ServiceName of the sending application for cross-service flow visualization.
/// </summary>
/// <param name="TenantId">The tenant identifier from the envelope.</param>
/// <param name="Source">The ServiceName of the application that sent the message.</param>
public record RecordSent(string TenantId, string Source) : IHandlerMetricsData
{
/// <inheritdoc />
public void Apply(PerTenantTracking tracking)
{
tracking.Sent++;
}
}
3 changes: 2 additions & 1 deletion src/Wolverine/Runtime/Routing/MessageRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public MessageRoute(Type messageType, Endpoint endpoint, IWolverineRuntime runti
}
else
{
Sender = endpoint.Agent ?? throw new ArgumentOutOfRangeException(nameof(endpoint), $"Endpoint {endpoint.Uri} does not have an active sending agent. Message type: {messageType.FullNameInCode()}");
// Might need to force it to build out the sending agent if this is the first time it's been used
Sender = endpoint.Agent ?? runtime.Endpoints.GetOrBuildSendingAgent(endpoint.Uri) ?? throw new ArgumentOutOfRangeException(nameof(endpoint), $"Endpoint {endpoint.Uri} does not have an active sending agent. Message type: {messageType.FullNameInCode()}");
}

IsLocal = endpoint is LocalQueue;
Expand Down
18 changes: 17 additions & 1 deletion src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using JasperFx.Blocks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Wolverine.Logging;
Expand Down Expand Up @@ -39,7 +40,12 @@ public void LogStatus(string message)

public void Sent(Envelope envelope)
{
// I think we'll have a different mechanism for this
if (envelope.MessageType.IsNotEmpty() && !IsSystemEndpoint(envelope.Destination))
{
_runtime._accumulator.Value.FindAccumulator(envelope.GetMessageTypeName(), envelope.Destination)
.EntryPoint.Post(new RecordSent(envelope.TenantId, _serviceName));
}

_runtime.ActiveSession?.MaybeRecord(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId);
_sent(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id,
envelope.Destination?.ToString() ?? string.Empty,
Expand All @@ -48,6 +54,16 @@ public void Sent(Envelope envelope)

public void Received(Envelope envelope)
{
var isExternal = envelope.Destination != null
&& !envelope.Destination.Scheme.EqualsIgnoreCase("local")
&& !envelope.Destination.Scheme.EqualsIgnoreCase("stub");

if (isExternal && envelope.MessageType.IsNotEmpty() && !IsSystemEndpoint(envelope.Destination))
{
_runtime._accumulator.Value.FindAccumulator(envelope.GetMessageTypeName(), envelope.Destination)
.EntryPoint.Post(new RecordReceived(envelope.TenantId, _serviceName));
}

_runtime.ActiveSession?.Record(MessageEventType.Received, envelope, _serviceName, _uniqueNodeId);
_received(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id,
envelope.Destination?.ToString() ?? string.Empty,
Expand Down
Loading
Loading