-
Notifications
You must be signed in to change notification settings - Fork 838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WebSockets telemetry #1237
Merged
Merged
Add WebSockets telemetry #1237
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
c11308f
Add WebSockets telemetry
MihaZupan d97a4b9
Simplify test
MihaZupan 7a72f25
Add IClock, don't count control frames
MihaZupan 900ff00
Add WebSocketsParser unit tests
MihaZupan 177bd96
Only inspect WebSocket streams
MihaZupan e7716fe
Test control frames with payloads
MihaZupan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
samples/ReverseProxy.Metrics.Sample/WebSocketsTelemetryConsumer.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
using System; | ||
using Microsoft.Extensions.Logging; | ||
using Yarp.Telemetry.Consumption; | ||
|
||
namespace Yarp.Sample | ||
{ | ||
public sealed class WebSocketsTelemetryConsumer : IWebSocketsTelemetryConsumer | ||
{ | ||
private readonly ILogger<WebSocketsTelemetryConsumer> _logger; | ||
|
||
public WebSocketsTelemetryConsumer(ILogger<WebSocketsTelemetryConsumer> logger) | ||
{ | ||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||
} | ||
|
||
public void OnWebSocketClosed(DateTime timestamp, DateTime establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten) | ||
{ | ||
_logger.LogInformation($"WebSocket connection closed ({closeReason}) after reading {messagesRead} and writing {messagesWritten} messages over {(timestamp - establishedTime).TotalSeconds:N2} seconds."); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
14 changes: 14 additions & 0 deletions
14
src/ReverseProxy/WebSocketsTelemetry/WebSocketCloseReason.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT License. | ||
|
||
namespace Yarp.ReverseProxy.WebSocketsTelemetry | ||
{ | ||
internal enum WebSocketCloseReason : int | ||
{ | ||
Unknown, | ||
ClientGracefulClose, | ||
ServerGracefulClose, | ||
ClientDisconnect, | ||
ServerDisconnect, | ||
} | ||
} |
142 changes: 142 additions & 0 deletions
142
src/ReverseProxy/WebSocketsTelemetry/WebSocketsParser.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT License. | ||
|
||
using System; | ||
using System.Diagnostics; | ||
using Yarp.ReverseProxy.Utilities; | ||
|
||
namespace Yarp.ReverseProxy.WebSocketsTelemetry | ||
{ | ||
internal unsafe struct WebSocketsParser | ||
{ | ||
private const int MaskLength = 4; | ||
private const int MinHeaderSize = 2; | ||
private const int MaxHeaderSize = MinHeaderSize + MaskLength + sizeof(ulong); | ||
|
||
private fixed byte _leftoverBuffer[MaxHeaderSize - 1]; | ||
private readonly byte _minHeaderSize; | ||
private byte _leftover; | ||
private ulong _bytesToSkip; | ||
private long _closeTime; | ||
private readonly IClock _clock; | ||
|
||
public long MessageCount { get; private set; } | ||
|
||
public DateTime? CloseTime => _closeTime == 0 ? null : new DateTime(_closeTime, DateTimeKind.Utc); | ||
|
||
public WebSocketsParser(IClock clock, bool isServer) | ||
{ | ||
_minHeaderSize = (byte)(MinHeaderSize + (isServer ? MaskLength : 0)); | ||
_leftover = 0; | ||
_bytesToSkip = 0; | ||
_closeTime = 0; | ||
_clock = clock; | ||
MessageCount = 0; | ||
} | ||
|
||
// The WebSocket Protocol: https://datatracker.ietf.org/doc/html/rfc6455#section-5.2 | ||
// 0 1 2 3 | ||
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | ||
// +-+-+-+-+-------+-+-------------+-------------------------------+ | ||
// |F|R|R|R| opcode|M| Payload len | Extended payload length | | ||
// |I|S|S|S| (4) |A| (7) | (16/64) | | ||
// |N|V|V|V| |S| | (if payload len==126/127) | | ||
// | |1|2|3| |K| | | | ||
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | ||
// | Extended payload length continued, if payload len == 127 | | ||
// + - - - - - - - - - - - - - - - +-------------------------------+ | ||
// | |Masking-key, if MASK set to 1 | | ||
// +-------------------------------+-------------------------------+ | ||
// | Masking-key (continued) | Payload Data | | ||
// +-------------------------------- - - - - - - - - - - - - - - - + | ||
// : Payload Data continued ... : | ||
// +---------------------------------------------------------------+ | ||
// | ||
// The header can be 2-10 bytes long, followed by a 4 byte mask if the message was sent by the client. | ||
// We have to read the first 2 bytes to know how long the frame header will be. | ||
// Since the buffer may not contain the full frame, we make use of a leftoverBuffer | ||
// where we store leftover bytes that don't represent a complete frame header. | ||
// On the next call to Consume, we interpret the leftover bytes as the beginning of the frame. | ||
// As we are not interested in the actual payload data, we skip over (payload length + mask length) bytes after each header. | ||
public void Consume(ReadOnlySpan<byte> buffer) | ||
MihaZupan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
int leftover = _leftover; | ||
var bytesToSkip = _bytesToSkip; | ||
|
||
while (true) | ||
{ | ||
var toSkip = Math.Min(bytesToSkip, (ulong)buffer.Length); | ||
buffer = buffer.Slice((int)toSkip); | ||
bytesToSkip -= toSkip; | ||
|
||
var available = leftover + buffer.Length; | ||
int headerSize = _minHeaderSize; | ||
|
||
if (available < headerSize) | ||
{ | ||
break; | ||
} | ||
|
||
var length = (leftover > 1 ? _leftoverBuffer[1] : buffer[1 - leftover]) & 0x7FUL; | ||
|
||
if (length > 125) | ||
{ | ||
// The actual length will be encoded in 2 or 8 bytes, based on whether the length was 126 or 127 | ||
var lengthBytes = 2 << (((int)length & 1) << 1); | ||
headerSize += lengthBytes; | ||
Debug.Assert(leftover < headerSize); | ||
|
||
if (available < headerSize) | ||
{ | ||
break; | ||
} | ||
|
||
lengthBytes += MinHeaderSize; | ||
|
||
length = 0; | ||
for (var i = MinHeaderSize; i < lengthBytes; i++) | ||
{ | ||
length <<= 8; | ||
length |= i < leftover ? _leftoverBuffer[i] : buffer[i - leftover]; | ||
} | ||
} | ||
|
||
Debug.Assert(leftover < headerSize); | ||
bytesToSkip = length; | ||
|
||
const int NonReservedBitsMask = 0b_1000_1111; | ||
var header = (leftover > 0 ? _leftoverBuffer[0] : buffer[0]) & NonReservedBitsMask; | ||
|
||
// Don't count control frames under MessageCount | ||
if ((uint)(header - 0x80) <= 0x02) | ||
{ | ||
// Has FIN (0x80) and is a Continuation (0x00) / Text (0x01) / Binary (0x02) opcode | ||
MessageCount++; | ||
} | ||
else if ((header & 0xF) == 0x8) // CLOSE | ||
{ | ||
if (_closeTime == 0) | ||
{ | ||
_closeTime = _clock.GetUtcNow().Ticks; | ||
} | ||
} | ||
|
||
// Advance the buffer by the number of bytes read for the header, | ||
// accounting for any bytes we may have read from the leftoverBuffer | ||
buffer = buffer.Slice(headerSize - leftover); | ||
leftover = 0; | ||
} | ||
|
||
Debug.Assert(bytesToSkip == 0 || buffer.Length == 0); | ||
_bytesToSkip = bytesToSkip; | ||
|
||
Debug.Assert(leftover + buffer.Length < MaxHeaderSize); | ||
for (var i = 0; i < buffer.Length; i++, leftover++) | ||
{ | ||
_leftoverBuffer[leftover] = buffer[i]; | ||
} | ||
|
||
_leftover = (byte)leftover; | ||
} | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
src/ReverseProxy/WebSocketsTelemetry/WebSocketsTelemetry.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT License. | ||
|
||
using System.Diagnostics.Tracing; | ||
|
||
namespace Yarp.ReverseProxy.WebSocketsTelemetry | ||
{ | ||
[EventSource(Name = "Yarp.ReverseProxy.WebSockets")] | ||
internal sealed class WebSocketsTelemetry : EventSource | ||
{ | ||
public static readonly WebSocketsTelemetry Log = new(); | ||
|
||
[Event(1, Level = EventLevel.Informational)] | ||
public void WebSocketClosed(long establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten) | ||
{ | ||
if (IsEnabled(EventLevel.Informational, EventKeywords.All)) | ||
{ | ||
WriteEvent(eventId: 1, establishedTime, closeReason, messagesRead, messagesWritten); | ||
} | ||
} | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
src/ReverseProxy/WebSocketsTelemetry/WebSocketsTelemetryExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT License. | ||
|
||
using System.Linq; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Yarp.ReverseProxy.Utilities; | ||
using Yarp.ReverseProxy.WebSocketsTelemetry; | ||
|
||
namespace Microsoft.AspNetCore.Builder | ||
{ | ||
/// <summary> | ||
/// <see cref="IApplicationBuilder"/> extension methods to add the <see cref="WebSocketsTelemetryMiddleware"/>. | ||
/// </summary> | ||
public static class WebSocketsTelemetryExtensions | ||
{ | ||
/// <summary> | ||
/// Adds a <see cref="WebSocketsTelemetryMiddleware"/> to the request pipeline. | ||
/// Must be added before <see cref="WebSockets.WebSocketMiddleware"/>. | ||
/// </summary> | ||
public static IApplicationBuilder UseWebSocketsTelemetry(this IApplicationBuilder app) | ||
{ | ||
return app.Use(next => | ||
{ | ||
// Avoid exposing another extension method (AddWebSocketsTelemetry) just because of IClock | ||
var clock = app.ApplicationServices.GetServices<IClock>().FirstOrDefault() ?? new Clock(); | ||
return new WebSocketsTelemetryMiddleware(next, clock).InvokeAsync; | ||
}); | ||
} | ||
} | ||
} |
87 changes: 87 additions & 0 deletions
87
src/ReverseProxy/WebSocketsTelemetry/WebSocketsTelemetryMiddleware.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT License. | ||
|
||
using System; | ||
using System.Diagnostics; | ||
using System.IO; | ||
using System.Threading.Tasks; | ||
using Microsoft.AspNetCore.Http; | ||
using Microsoft.AspNetCore.Http.Features; | ||
using Yarp.ReverseProxy.Utilities; | ||
|
||
namespace Yarp.ReverseProxy.WebSocketsTelemetry | ||
{ | ||
internal sealed class WebSocketsTelemetryMiddleware | ||
{ | ||
private readonly RequestDelegate _next; | ||
private readonly IClock _clock; | ||
|
||
public WebSocketsTelemetryMiddleware(RequestDelegate next, IClock clock) | ||
{ | ||
_next = next ?? throw new ArgumentNullException(nameof(next)); | ||
_clock = clock ?? throw new ArgumentNullException(nameof(clock)); | ||
} | ||
|
||
public Task InvokeAsync(HttpContext context) | ||
{ | ||
if (WebSocketsTelemetry.Log.IsEnabled()) | ||
{ | ||
if (context.Features.Get<IHttpUpgradeFeature>() is { IsUpgradableRequest: true } upgradeFeature) | ||
{ | ||
var upgradeWrapper = new HttpUpgradeFeatureWrapper(_clock, upgradeFeature); | ||
return InvokeAsyncCore(context, upgradeWrapper, _next); | ||
} | ||
} | ||
|
||
return _next(context); | ||
} | ||
|
||
private static async Task InvokeAsyncCore(HttpContext context, HttpUpgradeFeatureWrapper upgradeWrapper, RequestDelegate next) | ||
{ | ||
context.Features.Set<IHttpUpgradeFeature>(upgradeWrapper); | ||
|
||
try | ||
{ | ||
await next(context); | ||
} | ||
finally | ||
{ | ||
if (upgradeWrapper.TelemetryStream is { } telemetryStream) | ||
{ | ||
WebSocketsTelemetry.Log.WebSocketClosed( | ||
telemetryStream.EstablishedTime.Ticks, | ||
telemetryStream.GetCloseReason(context), | ||
telemetryStream.MessagesRead, | ||
telemetryStream.MessagesWritten); | ||
} | ||
MihaZupan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
context.Features.Set(upgradeWrapper.InnerUpgradeFeature); | ||
} | ||
} | ||
|
||
private sealed class HttpUpgradeFeatureWrapper : IHttpUpgradeFeature | ||
{ | ||
private readonly IClock _clock; | ||
|
||
public IHttpUpgradeFeature InnerUpgradeFeature { get; private set; } | ||
|
||
public WebSocketsTelemetryStream? TelemetryStream { get; private set; } | ||
|
||
public bool IsUpgradableRequest => InnerUpgradeFeature.IsUpgradableRequest; | ||
|
||
public HttpUpgradeFeatureWrapper(IClock clock, IHttpUpgradeFeature upgradeFeature) | ||
{ | ||
_clock = clock ?? throw new ArgumentNullException(nameof(clock)); | ||
InnerUpgradeFeature = upgradeFeature ?? throw new ArgumentNullException(nameof(upgradeFeature)); | ||
} | ||
|
||
public async Task<Stream> UpgradeAsync() | ||
{ | ||
Debug.Assert(TelemetryStream is null); | ||
var opaqueTransport = await InnerUpgradeFeature.UpgradeAsync(); | ||
TelemetryStream = new WebSocketsTelemetryStream(_clock, opaqueTransport); | ||
MihaZupan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return TelemetryStream; | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be a bug where if the request copy finishes first successfully, we will attribute potential response errors to the request side.