Skip to content

Commit

Permalink
[EventHubs] Optimize body eager copying (Azure#31322)
Browse files Browse the repository at this point in the history
* Optimize EagerCopyingMessageBody by making sure the array doesn't need to grow

* Optimize CopyingOnConversionMessageBody by making sure the array doesn't need to grow

* Rename loop variable
  • Loading branch information
danielmarbach authored Sep 22, 2022
1 parent 25b9d3c commit a25ee97
Showing 1 changed file with 68 additions and 33 deletions.
101 changes: 68 additions & 33 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Linq;
using Azure.Core;
using Azure.Core.Amqp;
using Azure.Messaging.EventHubs;
using Microsoft.Azure.Amqp.Framing;

namespace Azure.Messaging.EventHubs.Amqp
Expand Down Expand Up @@ -150,11 +149,7 @@ protected override ReadOnlyMemory<byte> WrittenMemory
{
if (_lazySegments != null)
{
foreach (var segment in _lazySegments)
{
AppendSegment(segment);
}

AppendSegments(_lazySegments);
_lazySegments = null;
}

Expand Down Expand Up @@ -183,21 +178,43 @@ public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator() =>
_segments?.GetEnumerator() ?? _lazySegments.GetEnumerator();

/// <summary>
/// Appends a memory segment to the continuous buffer.
/// Appends memory segments to the continuous buffer.
/// </summary>
///
/// <param name="segment">The memory segment to append.</param>
/// <param name="dataSegments">The memory segments to append.</param>
///
private void AppendSegment(ReadOnlyMemory<byte> segment)
private void AppendSegments(IEnumerable<ReadOnlyMemory<byte>> dataSegments)
{
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();
int length = 0;
int numberOfSegments = 0;
List<ReadOnlyMemory<byte>> segments = null;
foreach (var segment in dataSegments)
{
segments ??= dataSegments is IReadOnlyCollection<ReadOnlyMemory<byte>> readOnlyList
? new List<ReadOnlyMemory<byte>>(readOnlyList.Count)
: new List<ReadOnlyMemory<byte>>();
length += segment.Length;
numberOfSegments++;
segments.Add(segment);
}

if (segments == null)
{
return;
}

var memory = _writer.GetMemory(segment.Length);
segment.CopyTo(memory);
// fields are lazy initialized to not occupy unnecessary memory when there are no data segments
_writer = length > 0 ? new ArrayBufferWriter<byte>(length) : new ArrayBufferWriter<byte>();
_segments = segments;

_writer.Advance(segment.Length);
_segments.Add(memory.Slice(0, segment.Length));
for (var segmentIndex = 0; segmentIndex < numberOfSegments; segmentIndex++)
{
var dataToAppend = segments[segmentIndex];
var memory = _writer.GetMemory(dataToAppend.Length);
dataToAppend.CopyTo(memory);
_writer.Advance(dataToAppend.Length);
segments[segmentIndex] = memory.Slice(0, dataToAppend.Length);
}
}
}

Expand Down Expand Up @@ -230,10 +247,7 @@ private sealed class EagerCopyingMessageBody : MessageBody
///
public EagerCopyingMessageBody(IEnumerable<Data> dataSegments)
{
foreach (var segment in dataSegments)
{
AppendSegment(segment);
}
AppendSegments(dataSegments);
}

/// <summary>
Expand All @@ -245,28 +259,49 @@ public EagerCopyingMessageBody(IEnumerable<Data> dataSegments)
public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator() => _segments.GetEnumerator();

/// <summary>
/// Appends a memory segment to the continuous buffer.
/// Appends memory segments to the continuous buffer.
/// </summary>
///
/// <param name="segment">The memory segment to append.</param>
/// <param name="dataSegments">The memory segments to append.</param>
///
private void AppendSegment(Data segment)
private void AppendSegments(IEnumerable<Data> dataSegments)
{
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();
int length = 0;
int numberOfSegments = 0;
List<ReadOnlyMemory<byte>> segments = null;
foreach (var segment in dataSegments)
{
segments ??= dataSegments is IReadOnlyCollection<Data> readOnlyList
? new List<ReadOnlyMemory<byte>>(readOnlyList.Count)
: new List<ReadOnlyMemory<byte>>();
ReadOnlyMemory<byte> dataToAppend = segment.Value switch
{
byte[] byteArray => byteArray,
ArraySegment<byte> arraySegment => arraySegment,
_ => ReadOnlyMemory<byte>.Empty
};
length += dataToAppend.Length;
numberOfSegments++;
segments.Add(dataToAppend);
}

ReadOnlyMemory<byte> dataToAppend = segment.Value switch
if (segments == null)
{
byte[] byteArray => byteArray,
ArraySegment<byte> arraySegment => arraySegment,
_ => ReadOnlyMemory<byte>.Empty
};
return;
}

var memory = _writer.GetMemory(dataToAppend.Length);
dataToAppend.CopyTo(memory);
// fields are lazy initialized to not occupy unnecessary memory when there are no data segments
_writer = length > 0 ? new ArrayBufferWriter<byte>(length) : new ArrayBufferWriter<byte>();
_segments = segments;

_writer.Advance(dataToAppend.Length);
_segments.Add(memory.Slice(0, dataToAppend.Length));
for (var segmentIndex = 0; segmentIndex < numberOfSegments; segmentIndex++)
{
var dataToAppend = segments[segmentIndex];
var memory = _writer.GetMemory(dataToAppend.Length);
dataToAppend.CopyTo(memory);
_writer.Advance(dataToAppend.Length);
segments[segmentIndex] = memory.Slice(0, dataToAppend.Length);
}
}
}
}
Expand Down

0 comments on commit a25ee97

Please sign in to comment.