Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSockets telemetry #1237

Merged
merged 6 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions samples/ReverseProxy.Metrics.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public void ConfigureServices(IServiceCollection services)
// Registration of a consumer to events for HttpClient telemetry
// Note: this depends on changes implemented in .NET 5
services.AddTelemetryConsumer<HttpClientTelemetryConsumer>();

services.AddTelemetryConsumer<WebSocketsTelemetryConsumer>();
}

/// <summary>
Expand All @@ -55,6 +57,9 @@ public void Configure(IApplicationBuilder app)
// Placed at the beginning so it is the first and last thing run for each request
app.UsePerRequestMetricCollection();

// Middleware used to intercept the WebSocket connection and collect telemetry exposed to WebSocketsTelemetryConsumer
app.UseWebSocketsTelemetry();

app.UseRouting();
app.UseEndpoints(endpoints =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using Microsoft.Extensions.Logging;
using Yarp.Telemetry.Consumption;

namespace Yarp.Sample
{
public sealed class WebSocketsTelemetryConsumer : IWebSocketsTelemetryConsumer
{
private readonly ILogger<WebSocketsTelemetryConsumer> _logger;

public WebSocketsTelemetryConsumer(ILogger<WebSocketsTelemetryConsumer> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void OnWebSocketClosed(DateTime timestamp, DateTime establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
{
_logger.LogInformation($"WebSocket connection closed ({closeReason}) after reading {messagesRead} and writing {messagesWritten} messages over {(timestamp - establishedTime).TotalSeconds:N2} seconds.");
}
}
}
2 changes: 1 addition & 1 deletion src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
var (secondResult, secondException) = await secondTask;
if (secondResult != StreamCopyResult.Success)
{
error = ReportResult(context, requestFinishedFirst, secondResult, secondException!);
error = ReportResult(context, !requestFinishedFirst, secondResult, secondException!);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a bug where if the request copy finishes first successfully, we will attribute potential response errors to the request side.

}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Yarp.Tests.Common
namespace Yarp.ReverseProxy.Utilities
{
// Taken from https://github.com/dotnet/runtime/blob/00f37bc13b4edbba1afca9e98d74432a94f5192f/src/libraries/Common/src/System/IO/DelegatingStream.cs
// Forwards all calls to an inner stream except where overridden in a derived class.
internal abstract class DelegatingStream : Stream
{
Expand Down Expand Up @@ -113,9 +114,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
return _innerStream.ReadAsync(buffer, cancellationToken);
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return _innerStream.BeginRead(buffer, offset, count, callback, state);
return _innerStream.BeginRead(buffer, offset, count, callback!, state);
}

public override int EndRead(IAsyncResult asyncResult)
Expand Down Expand Up @@ -167,9 +168,9 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
return _innerStream.WriteAsync(buffer, cancellationToken);
}

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return _innerStream.BeginWrite(buffer, offset, count, callback, state);
return _innerStream.BeginWrite(buffer, offset, count, callback!, state);
}

public override void EndWrite(IAsyncResult asyncResult)
Expand Down
14 changes: 14 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketCloseReason.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal enum WebSocketCloseReason : int
{
Unknown,
ClientGracefulClose,
ServerGracefulClose,
ClientDisconnect,
ServerDisconnect,
}
}
142 changes: 142 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketsParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using Yarp.ReverseProxy.Utilities;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal unsafe struct WebSocketsParser
{
private const int MaskLength = 4;
private const int MinHeaderSize = 2;
private const int MaxHeaderSize = MinHeaderSize + MaskLength + sizeof(ulong);

private fixed byte _leftoverBuffer[MaxHeaderSize - 1];
private readonly byte _minHeaderSize;
private byte _leftover;
private ulong _bytesToSkip;
private long _closeTime;
private readonly IClock _clock;

public long MessageCount { get; private set; }

public DateTime? CloseTime => _closeTime == 0 ? null : new DateTime(_closeTime, DateTimeKind.Utc);

public WebSocketsParser(IClock clock, bool isServer)
{
_minHeaderSize = (byte)(MinHeaderSize + (isServer ? MaskLength : 0));
_leftover = 0;
_bytesToSkip = 0;
_closeTime = 0;
_clock = clock;
MessageCount = 0;
}

// The WebSocket Protocol: https://datatracker.ietf.org/doc/html/rfc6455#section-5.2
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// +---------------------------------------------------------------+
//
// The header can be 2-10 bytes long, followed by a 4 byte mask if the message was sent by the client.
// We have to read the first 2 bytes to know how long the frame header will be.
// Since the buffer may not contain the full frame, we make use of a leftoverBuffer
// where we store leftover bytes that don't represent a complete frame header.
// On the next call to Consume, we interpret the leftover bytes as the beginning of the frame.
// As we are not interested in the actual payload data, we skip over (payload length + mask length) bytes after each header.
public void Consume(ReadOnlySpan<byte> buffer)
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
{
int leftover = _leftover;
var bytesToSkip = _bytesToSkip;

while (true)
{
var toSkip = Math.Min(bytesToSkip, (ulong)buffer.Length);
buffer = buffer.Slice((int)toSkip);
bytesToSkip -= toSkip;

var available = leftover + buffer.Length;
int headerSize = _minHeaderSize;

if (available < headerSize)
{
break;
}

var length = (leftover > 1 ? _leftoverBuffer[1] : buffer[1 - leftover]) & 0x7FUL;

if (length > 125)
{
// The actual length will be encoded in 2 or 8 bytes, based on whether the length was 126 or 127
var lengthBytes = 2 << (((int)length & 1) << 1);
headerSize += lengthBytes;
Debug.Assert(leftover < headerSize);

if (available < headerSize)
{
break;
}

lengthBytes += MinHeaderSize;

length = 0;
for (var i = MinHeaderSize; i < lengthBytes; i++)
{
length <<= 8;
length |= i < leftover ? _leftoverBuffer[i] : buffer[i - leftover];
}
}

Debug.Assert(leftover < headerSize);
bytesToSkip = length;

const int NonReservedBitsMask = 0b_1000_1111;
var header = (leftover > 0 ? _leftoverBuffer[0] : buffer[0]) & NonReservedBitsMask;

// Don't count control frames under MessageCount
if ((uint)(header - 0x80) <= 0x02)
{
// Has FIN (0x80) and is a Continuation (0x00) / Text (0x01) / Binary (0x02) opcode
MessageCount++;
}
else if ((header & 0xF) == 0x8) // CLOSE
{
if (_closeTime == 0)
{
_closeTime = _clock.GetUtcNow().Ticks;
}
}

// Advance the buffer by the number of bytes read for the header,
// accounting for any bytes we may have read from the leftoverBuffer
buffer = buffer.Slice(headerSize - leftover);
leftover = 0;
}

Debug.Assert(bytesToSkip == 0 || buffer.Length == 0);
_bytesToSkip = bytesToSkip;

Debug.Assert(leftover + buffer.Length < MaxHeaderSize);
for (var i = 0; i < buffer.Length; i++, leftover++)
{
_leftoverBuffer[leftover] = buffer[i];
}

_leftover = (byte)leftover;
}
}
}
22 changes: 22 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketsTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics.Tracing;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
[EventSource(Name = "Yarp.ReverseProxy.WebSockets")]
internal sealed class WebSocketsTelemetry : EventSource
{
public static readonly WebSocketsTelemetry Log = new();

[Event(1, Level = EventLevel.Informational)]
public void WebSocketClosed(long establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
{
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
WriteEvent(eventId: 1, establishedTime, closeReason, messagesRead, messagesWritten);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Yarp.ReverseProxy.Utilities;
using Yarp.ReverseProxy.WebSocketsTelemetry;

namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// <see cref="IApplicationBuilder"/> extension methods to add the <see cref="WebSocketsTelemetryMiddleware"/>.
/// </summary>
public static class WebSocketsTelemetryExtensions
{
/// <summary>
/// Adds a <see cref="WebSocketsTelemetryMiddleware"/> to the request pipeline.
/// Must be added before <see cref="WebSockets.WebSocketMiddleware"/>.
/// </summary>
public static IApplicationBuilder UseWebSocketsTelemetry(this IApplicationBuilder app)
{
return app.Use(next =>
{
// Avoid exposing another extension method (AddWebSocketsTelemetry) just because of IClock
var clock = app.ApplicationServices.GetServices<IClock>().FirstOrDefault() ?? new Clock();
return new WebSocketsTelemetryMiddleware(next, clock).InvokeAsync;
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Yarp.ReverseProxy.Utilities;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal sealed class WebSocketsTelemetryMiddleware
{
private readonly RequestDelegate _next;
private readonly IClock _clock;

public WebSocketsTelemetryMiddleware(RequestDelegate next, IClock clock)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}

public Task InvokeAsync(HttpContext context)
{
if (WebSocketsTelemetry.Log.IsEnabled())
{
if (context.Features.Get<IHttpUpgradeFeature>() is { IsUpgradableRequest: true } upgradeFeature)
{
var upgradeWrapper = new HttpUpgradeFeatureWrapper(_clock, upgradeFeature);
return InvokeAsyncCore(context, upgradeWrapper, _next);
}
}

return _next(context);
}

private static async Task InvokeAsyncCore(HttpContext context, HttpUpgradeFeatureWrapper upgradeWrapper, RequestDelegate next)
{
context.Features.Set<IHttpUpgradeFeature>(upgradeWrapper);

try
{
await next(context);
}
finally
{
if (upgradeWrapper.TelemetryStream is { } telemetryStream)
{
WebSocketsTelemetry.Log.WebSocketClosed(
telemetryStream.EstablishedTime.Ticks,
telemetryStream.GetCloseReason(context),
telemetryStream.MessagesRead,
telemetryStream.MessagesWritten);
}
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved

context.Features.Set(upgradeWrapper.InnerUpgradeFeature);
}
}

private sealed class HttpUpgradeFeatureWrapper : IHttpUpgradeFeature
{
private readonly IClock _clock;

public IHttpUpgradeFeature InnerUpgradeFeature { get; private set; }

public WebSocketsTelemetryStream? TelemetryStream { get; private set; }

public bool IsUpgradableRequest => InnerUpgradeFeature.IsUpgradableRequest;

public HttpUpgradeFeatureWrapper(IClock clock, IHttpUpgradeFeature upgradeFeature)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
InnerUpgradeFeature = upgradeFeature ?? throw new ArgumentNullException(nameof(upgradeFeature));
}

public async Task<Stream> UpgradeAsync()
{
Debug.Assert(TelemetryStream is null);
var opaqueTransport = await InnerUpgradeFeature.UpgradeAsync();
TelemetryStream = new WebSocketsTelemetryStream(_clock, opaqueTransport);
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
return TelemetryStream;
}
}
}
}
Loading