Copy the PublishPacket and then queue it#2115
Copy the PublishPacket and then queue it#2115xljiulang wants to merge 4 commits intodotnet:masterfrom
Conversation
|
I found that this is a stubborn bug. After making the above fix, if I modify the logic of MqttChannelAdapter receiving data packets, the bug still exists. So once a packet is received, we have to copy its payload out instead of using the Pool memory? In Mqttnet.AspNetCore, using Pool memory is the default behavior, which means that the project has never handled this situation correctly. async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}
var readFixedHeaderResult = await ReadFixedHeaderAsync(cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}
if (readFixedHeaderResult.IsConnectionClosed)
{
return ReceivedMqttPacket.Empty;
}
var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
{
return new ReceivedMqttPacket(fixedHeader.Flags, ReadOnlySequence<byte>.Empty, 2);
}
var bodyLength = fixedHeader.RemainingLength;
// Return and clear the previous body buffer
_bodyOwner?.Dispose();
// Re-rent a body buffer
_bodyOwner = BufferOwner.Rent(bodyLength);
var body = _bodyOwner.Buffer;
var bodyOffset = 0;
var chunkSize = Math.Min(ReadBufferSize, bodyLength);
do
{
var bytesLeft = bodyLength - bodyOffset;
if (chunkSize > bytesLeft)
{
chunkSize = bytesLeft;
}
var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}
if (readBytes == 0)
{
return ReceivedMqttPacket.Empty;
}
bodyOffset += readBytes;
} while (bodyOffset < bodyLength);
PacketInspector?.FillReceiveBuffer(body.AsSpan(0, bodyLength));
var bodySegment = body.AsMemory(0, bodyLength);
var bodySequence = new ReadOnlySequence<byte>(bodySegment);
return new ReceivedMqttPacket(fixedHeader.Flags, bodySequence, fixedHeader.TotalLength);
}private sealed class BufferOwner : IDisposable
{
private bool _disposed = false;
public byte[] Buffer { get; private set; }
public static BufferOwner Rent(int bufferSieze)
{
return new BufferOwner()
{
Buffer = ArrayPool<byte>.Shared.Rent(bufferSieze)
};
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
ArrayPool<byte>.Shared.Return(Buffer);
}
}
} |
| /// <summary> | ||
| /// When enabled, received ApplicationMessage will be deep cloned and enqueued. | ||
| /// </summary> | ||
| public bool ReceivedApplicationMessageQueueable { get; set; } = true; |
There was a problem hiding this comment.
I want to keep this switch configuration in Options and let users choose. Can we radically change the default to false to reduce GC pressure?
| if (Options.ReceivedApplicationMessageQueueable) | ||
| { | ||
| // publishPacket must be copied | ||
| EnqueueReceivedPublishPacket(publishPacket.Clone()); |
There was a problem hiding this comment.
The publishPacket here must be cloned, otherwise problems will occur after replacing it with IMqttClientAdapterFactory of mqttnet.asp.
We can consider using Pool memory for MQTTnet.Adapter.MqttChannelAdapter to avoid unnecessary cloning of publishPacket.
This PR fixes #2113