Skip to content

Commit

Permalink
Add WebSockets telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
MihaZupan committed Sep 21, 2021
1 parent 5583eb7 commit c11308f
Show file tree
Hide file tree
Showing 16 changed files with 844 additions and 22 deletions.
5 changes: 5 additions & 0 deletions samples/ReverseProxy.Metrics.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public void ConfigureServices(IServiceCollection services)
// Registration of a consumer to events for HttpClient telemetry
// Note: this depends on changes implemented in .NET 5
services.AddTelemetryConsumer<HttpClientTelemetryConsumer>();

services.AddTelemetryConsumer<WebSocketsTelemetryConsumer>();
}

/// <summary>
Expand All @@ -55,6 +57,9 @@ public void Configure(IApplicationBuilder app)
// Placed at the beginning so it is the first and last thing run for each request
app.UsePerRequestMetricCollection();

// Middleware used to intercept the WebSocket connection and collect telemetry exposed to WebSocketsTelemetryConsumer
app.UseWebSocketsTelemetry();

app.UseRouting();
app.UseEndpoints(endpoints =>
{
Expand Down
21 changes: 21 additions & 0 deletions samples/ReverseProxy.Metrics.Sample/WebSocketsTelemetryConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using Microsoft.Extensions.Logging;
using Yarp.Telemetry.Consumption;

namespace Yarp.Sample
{
public sealed class WebSocketsTelemetryConsumer : IWebSocketsTelemetryConsumer
{
private readonly ILogger<WebSocketsTelemetryConsumer> _logger;

public WebSocketsTelemetryConsumer(ILogger<WebSocketsTelemetryConsumer> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void OnWebSocketClosed(DateTime timestamp, DateTime establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
{
_logger.LogInformation($"WebSocket connection closed ({closeReason}) after reading {messagesRead} and writing {messagesWritten} messages over {(timestamp - establishedTime).TotalSeconds:N2} seconds.");
}
}
}
2 changes: 1 addition & 1 deletion src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
var (secondResult, secondException) = await secondTask;
if (secondResult != StreamCopyResult.Success)
{
error = ReportResult(context, requestFinishedFirst, secondResult, secondException!);
error = ReportResult(context, !requestFinishedFirst, secondResult, secondException!);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Yarp.Tests.Common
namespace Yarp.ReverseProxy.Utilities
{
// Taken from https://github.com/dotnet/runtime/blob/00f37bc13b4edbba1afca9e98d74432a94f5192f/src/libraries/Common/src/System/IO/DelegatingStream.cs
// Forwards all calls to an inner stream except where overridden in a derived class.
internal abstract class DelegatingStream : Stream
{
Expand Down Expand Up @@ -113,9 +114,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
return _innerStream.ReadAsync(buffer, cancellationToken);
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return _innerStream.BeginRead(buffer, offset, count, callback, state);
return _innerStream.BeginRead(buffer, offset, count, callback!, state);
}

public override int EndRead(IAsyncResult asyncResult)
Expand Down Expand Up @@ -167,9 +168,9 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
return _innerStream.WriteAsync(buffer, cancellationToken);
}

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return _innerStream.BeginWrite(buffer, offset, count, callback, state);
return _innerStream.BeginWrite(buffer, offset, count, callback!, state);
}

public override void EndWrite(IAsyncResult asyncResult)
Expand Down
14 changes: 14 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketCloseReason.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal enum WebSocketCloseReason : int
{
Unknown,
ClientGracefulClose,
ServerGracefulClose,
ClientDisconnect,
ServerDisconnect,
}
}
132 changes: 132 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketsParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal unsafe struct WebSocketsParser
{
private const int MaskLength = 4;
private const int MinHeaderSize = 2;
private const int MaxHeaderSize = MinHeaderSize + MaskLength + sizeof(ulong);

private fixed byte _leftoverBuffer[MaxHeaderSize - 1];
private readonly byte _minHeaderSize;
private byte _leftover;
private ulong _bytesToSkip;

public long MessageCount { get; private set; }

public DateTime? CloseTime { get; private set; }

public WebSocketsParser(bool isServer)
{
_minHeaderSize = (byte)(MinHeaderSize + (isServer ? MaskLength : 0));
_leftover = 0;
_bytesToSkip = 0;
MessageCount = 0;
CloseTime = null;
}

// The WebSocket Protocol: https://datatracker.ietf.org/doc/html/rfc6455#section-5.2
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// +---------------------------------------------------------------+
//
// The header can be 2-10 bytes long, followed by a 4 byte mask if the message was sent by the client.
// We have to read the first 2 bytes to know how long the frame header will be.
// Since the buffer may not contain the full frame, we make use of a leftoverBuffer
// where we store leftover bytes that don't represent a complete frame header.
// On the next call to Consume, we interpret the leftover bytes as the beginning of the frame.
// As we are not interested in the actual payload data, we skip over (payload length + mask length) bytes after each header.
public void Consume(ReadOnlySpan<byte> buffer)
{
int leftover = _leftover;
var bytesToSkip = _bytesToSkip;

while (true)
{
var toSkip = Math.Min(bytesToSkip, (ulong)buffer.Length);
buffer = buffer.Slice((int)toSkip);
bytesToSkip -= toSkip;

var available = leftover + buffer.Length;
int headerSize = _minHeaderSize;

if (available < headerSize)
{
break;
}

var length = (leftover > 1 ? _leftoverBuffer[1] : buffer[1 - leftover]) & 0x7FUL;

if (length > 125)
{
// The actual length will be encoded in 2 or 8 bytes, based on whether the length was 126 or 127
var lengthBytes = 2 << (((int)length & 1) << 1);
headerSize += lengthBytes;
Debug.Assert(leftover < headerSize);

if (available < headerSize)
{
break;
}

lengthBytes += MinHeaderSize;

length = 0;
for (var i = MinHeaderSize; i < lengthBytes; i++)
{
length <<= 8;
length |= i < leftover ? _leftoverBuffer[i] : buffer[i - leftover];
}
}

Debug.Assert(leftover < headerSize);
bytesToSkip = length;

int header = leftover > 0 ? _leftoverBuffer[0] : buffer[0];

if ((header & 0xF) == 0x8) // CLOSE
{
CloseTime ??= DateTime.UtcNow;
}
else if ((header & 0x80) != 0) // FIN
{
MessageCount++;
}

// Advance the buffer by the number of bytes read for the header,
// accounting for any bytes we may have read from the leftoverBuffer
buffer = buffer.Slice(headerSize - leftover);
leftover = 0;
}

Debug.Assert(bytesToSkip == 0 || buffer.Length == 0);
_bytesToSkip = bytesToSkip;

Debug.Assert(leftover + buffer.Length < MaxHeaderSize);
for (var i = 0; i < buffer.Length; i++, leftover++)
{
_leftoverBuffer[leftover] = buffer[i];
}

_leftover = (byte)leftover;
}
}
}
22 changes: 22 additions & 0 deletions src/ReverseProxy/WebSocketsTelemetry/WebSocketsTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics.Tracing;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
[EventSource(Name = "Yarp.ReverseProxy.WebSockets")]
internal sealed class WebSocketsTelemetry : EventSource
{
public static readonly WebSocketsTelemetry Log = new();

[Event(1, Level = EventLevel.Informational)]
public void WebSocketClosed(long establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
{
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
WriteEvent(eventId: 1, establishedTime, closeReason, messagesRead, messagesWritten);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Yarp.ReverseProxy.WebSocketsTelemetry;

namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// <see cref="IApplicationBuilder"/> extension methods to add the <see cref="WebSocketsTelemetryMiddleware"/>.
/// </summary>
public static class WebSocketsTelemetryExtensions
{
/// <summary>
/// Adds a <see cref="WebSocketsTelemetryMiddleware"/> to the request pipeline.
/// Must be added before <see cref="WebSockets.WebSocketMiddleware"/>.
/// </summary>
public static IApplicationBuilder UseWebSocketsTelemetry(this IApplicationBuilder app)
{
return app.UseMiddleware<WebSocketsTelemetryMiddleware>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;

namespace Yarp.ReverseProxy.WebSocketsTelemetry
{
internal sealed class WebSocketsTelemetryMiddleware
{
private readonly RequestDelegate _next;

public WebSocketsTelemetryMiddleware(RequestDelegate next)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
}

public Task Invoke(HttpContext context)
{
if (WebSocketsTelemetry.Log.IsEnabled())
{
if (context.Features.Get<IHttpUpgradeFeature>() is { IsUpgradableRequest: true } upgradeFeature)
{
return InvokeAsyncCore(context, upgradeFeature, _next);
}
}

return _next(context);
}

private static async Task InvokeAsyncCore(HttpContext context, IHttpUpgradeFeature upgradeFeature, RequestDelegate next)
{
var upgradeWrapper = new HttpUpgradeFeatureWrapper(upgradeFeature);
context.Features.Set<IHttpUpgradeFeature>(upgradeWrapper);

try
{
await next(context);
}
finally
{
if (upgradeWrapper.TelemetryStream is { } telemetryStream)
{
WebSocketsTelemetry.Log.WebSocketClosed(
telemetryStream.EstablishedTime.Ticks,
telemetryStream.GetCloseReason(context),
telemetryStream.MessagesRead,
telemetryStream.MessagesWritten);
}

context.Features.Set(upgradeFeature);
}
}

private sealed class HttpUpgradeFeatureWrapper : IHttpUpgradeFeature
{
private readonly IHttpUpgradeFeature _upgradeFeature;

public WebSocketsTelemetryStream? TelemetryStream { get; private set; }

public bool IsUpgradableRequest => _upgradeFeature.IsUpgradableRequest;

public HttpUpgradeFeatureWrapper(IHttpUpgradeFeature upgradeFeature)
{
_upgradeFeature = upgradeFeature ?? throw new ArgumentNullException(nameof(upgradeFeature));
}

public async Task<Stream> UpgradeAsync()
{
Debug.Assert(TelemetryStream is null);
var opaqueTransport = await _upgradeFeature.UpgradeAsync();
TelemetryStream = new WebSocketsTelemetryStream(opaqueTransport);
return TelemetryStream;
}
}
}
}
Loading

0 comments on commit c11308f

Please sign in to comment.