Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of column cloning inside DataFrame arithmetics #6814

Merged
merged 10 commits into from
Sep 27, 2023
6 changes: 3 additions & 3 deletions src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private void Append(ReadOnlySpan<byte> value)
_offsetsBuffers.Add(mutableOffsetsBuffer);
mutableOffsetsBuffer.Append(0);
}
mutableDataBuffer.EnsureCapacity(value.Length);
value.CopyTo(mutableDataBuffer.RawSpan.Slice(mutableDataBuffer.Length));
mutableDataBuffer.Length += value.Length;
var startIndex = mutableDataBuffer.Length;
mutableDataBuffer.IncreaseSize(value.Length);
value.CopyTo(mutableDataBuffer.RawSpan.Slice(startIndex));
mutableOffsetsBuffer.Append(mutableOffsetsBuffer[mutableOffsetsBuffer.Length - 1] + value.Length);
}
SetValidityBit(Length - 1, value != default);
Expand Down
31 changes: 22 additions & 9 deletions src/Microsoft.Data.Analysis/DataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Microsoft.Data.Analysis
internal class DataFrameBuffer<T> : ReadOnlyDataFrameBuffer<T>
where T : unmanaged
{
private const int MinCapacity = 8;

private Memory<byte> _memory;

public override ReadOnlyMemory<byte> ReadOnlyBuffer => _memory;
Expand All @@ -36,24 +38,35 @@ public Span<T> RawSpan
get => MemoryMarshal.Cast<byte, T>(Buffer.Span);
}

public DataFrameBuffer(int numberOfValues = 8) : base(numberOfValues) { }
public DataFrameBuffer(int capacity = 0)
{
if ((long)capacity * Size > MaxCapacity)
{
throw new ArgumentException($"{capacity} exceeds buffer capacity", nameof(capacity));
}

_memory = new byte[Math.Max(capacity, MinCapacity)];
}

internal DataFrameBuffer(ReadOnlyMemory<byte> buffer, int length) : base(buffer, length)
internal DataFrameBuffer(ReadOnlyMemory<byte> buffer, int length)
{
_memory = new byte[buffer.Length];
buffer.CopyTo(_memory);
Length = length;
}

public void Append(T value)
{
if (Length == MaxCapacity)
{
throw new ArgumentException("Current buffer is full", nameof(value));
}
EnsureCapacity(1);
if (Length < MaxCapacity)
++Length;
Span[Length - 1] = value;

RawSpan[Length] = value;
Length++;
}

public void IncreaseSize(int numberOfValues)
{
EnsureCapacity(numberOfValues);
Length += numberOfValues;
}

public void EnsureCapacity(int numberOfValues)
Expand Down
65 changes: 20 additions & 45 deletions src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ public PrimitiveColumnContainer(ReadOnlyMemory<byte> buffer, ReadOnlyMemory<byte
ReadOnlyDataFrameBuffer<T> dataBuffer;
if (buffer.IsEmpty)
{
DataFrameBuffer<T> mutableBuffer = new DataFrameBuffer<T>();
mutableBuffer.EnsureCapacity(length);
mutableBuffer.Length = length;
DataFrameBuffer<T> mutableBuffer = new DataFrameBuffer<T>(length);
mutableBuffer.IncreaseSize(length);
mutableBuffer.RawSpan.Fill(default(T));
dataBuffer = mutableBuffer;
}
Expand Down Expand Up @@ -172,15 +171,12 @@ public void AppendMany(T? value, long count)

//Calculate how many values we can additionaly allocate and not exceed the MaxCapacity
int allocatable = (int)Math.Min(remaining, ReadOnlyDataFrameBuffer<T>.MaxCapacity - mutableLastBuffer.Length);
mutableLastBuffer.EnsureCapacity(allocatable);
mutableLastBuffer.IncreaseSize(allocatable);

DataFrameBuffer<byte> lastNullBitMapBuffer = NullBitMapBuffers.GetOrCreateMutable(NullBitMapBuffers.Count - 1);
int nullBufferAllocatable = (allocatable + 7) / 8;
lastNullBitMapBuffer.EnsureCapacity(nullBufferAllocatable);
lastNullBitMapBuffer.IncreaseSize(nullBufferAllocatable);


mutableLastBuffer.Length += allocatable;
lastNullBitMapBuffer.Length += nullBufferAllocatable;
Length += allocatable;

if (value.HasValue)
Expand Down Expand Up @@ -436,13 +432,8 @@ private List<ReadOnlyDataFrameBuffer<byte>> CloneNullBitMapBuffers()
List<ReadOnlyDataFrameBuffer<byte>> ret = new List<ReadOnlyDataFrameBuffer<byte>>();
foreach (ReadOnlyDataFrameBuffer<byte> buffer in NullBitMapBuffers)
{
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>();
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>(buffer.ReadOnlyBuffer, buffer.Length);
ret.Add(newBuffer);
ReadOnlySpan<byte> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
newBuffer.Append(span[i]);
}
}
return ret;
}
Expand Down Expand Up @@ -518,14 +509,9 @@ public PrimitiveColumnContainer<T> Clone()
var ret = new PrimitiveColumnContainer<T>();
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
DataFrameBuffer<T> newBuffer = new DataFrameBuffer<T>();
DataFrameBuffer<T> newBuffer = new DataFrameBuffer<T>(buffer.ReadOnlyBuffer, buffer.Length);
ret.Buffers.Add(newBuffer);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
ret.Length += buffer.Length;
for (int i = 0; i < span.Length; i++)
{
newBuffer.Append(span[i]);
}
}
ret.NullBitMapBuffers = CloneNullBitMapBuffers();
ret.NullCount = NullCount;
Expand All @@ -537,9 +523,10 @@ internal PrimitiveColumnContainer<bool> CloneAsBoolContainer()
var ret = new PrimitiveColumnContainer<bool>();
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
DataFrameBuffer<bool> newBuffer = new DataFrameBuffer<bool>();
DataFrameBuffer<bool> newBuffer = new DataFrameBuffer<bool>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
newBuffer.IncreaseSize(buffer.Length);

if (typeof(T) == typeof(bool))
{
var localBuffer = buffer;
Expand All @@ -550,7 +537,6 @@ internal PrimitiveColumnContainer<bool> CloneAsBoolContainer()
{
newBuffer.Span.Fill(false);
}
newBuffer.Length = buffer.Length;
ret.Length += buffer.Length;
}
ret.NullBitMapBuffers = CloneNullBitMapBuffers();
Expand All @@ -564,9 +550,8 @@ internal PrimitiveColumnContainer<byte> CloneAsByteContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>();
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -584,9 +569,8 @@ internal PrimitiveColumnContainer<sbyte> CloneAsSByteContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<sbyte> newBuffer = new DataFrameBuffer<sbyte>();
DataFrameBuffer<sbyte> newBuffer = new DataFrameBuffer<sbyte>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -604,9 +588,8 @@ internal PrimitiveColumnContainer<double> CloneAsDoubleContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<double> newBuffer = new DataFrameBuffer<double>();
DataFrameBuffer<double> newBuffer = new DataFrameBuffer<double>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -624,9 +607,8 @@ internal PrimitiveColumnContainer<decimal> CloneAsDecimalContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<decimal> newBuffer = new DataFrameBuffer<decimal>();
DataFrameBuffer<decimal> newBuffer = new DataFrameBuffer<decimal>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -644,9 +626,8 @@ internal PrimitiveColumnContainer<short> CloneAsShortContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<short> newBuffer = new DataFrameBuffer<short>();
DataFrameBuffer<short> newBuffer = new DataFrameBuffer<short>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -664,9 +645,8 @@ internal PrimitiveColumnContainer<ushort> CloneAsUShortContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<ushort> newBuffer = new DataFrameBuffer<ushort>();
DataFrameBuffer<ushort> newBuffer = new DataFrameBuffer<ushort>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -684,9 +664,8 @@ internal PrimitiveColumnContainer<int> CloneAsIntContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<int> newBuffer = new DataFrameBuffer<int>();
DataFrameBuffer<int> newBuffer = new DataFrameBuffer<int>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -704,9 +683,8 @@ internal PrimitiveColumnContainer<uint> CloneAsUIntContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<uint> newBuffer = new DataFrameBuffer<uint>();
DataFrameBuffer<uint> newBuffer = new DataFrameBuffer<uint>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -724,9 +702,8 @@ internal PrimitiveColumnContainer<long> CloneAsLongContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<long> newBuffer = new DataFrameBuffer<long>();
DataFrameBuffer<long> newBuffer = new DataFrameBuffer<long>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -744,9 +721,8 @@ internal PrimitiveColumnContainer<ulong> CloneAsULongContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<ulong> newBuffer = new DataFrameBuffer<ulong>();
DataFrameBuffer<ulong> newBuffer = new DataFrameBuffer<ulong>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -764,9 +740,8 @@ internal PrimitiveColumnContainer<float> CloneAsFloatContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<float> newBuffer = new DataFrameBuffer<float>();
DataFrameBuffer<float> newBuffer = new DataFrameBuffer<float>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand Down
Loading
Loading