Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up PrimitiveColumnContainer #6656

Merged
2 changes: 1 addition & 1 deletion src/Microsoft.Data.Analysis/DataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Data.Analysis
/// </summary>
/// <typeparam name="T"></typeparam>
internal class DataFrameBuffer<T> : ReadOnlyDataFrameBuffer<T>
where T : struct
where T : unmanaged
{
private Memory<byte> _memory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Microsoft.Data.Analysis
{
internal partial class PrimitiveColumnContainer<T>
where T : struct
where T : unmanaged
{
public PrimitiveColumnContainer<T> Add(PrimitiveColumnContainer<T> right)
{
Expand Down
145 changes: 60 additions & 85 deletions src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Microsoft.Data.Analysis
/// </summary>
/// <typeparam name="T"></typeparam>
internal partial class PrimitiveColumnContainer<T> : IEnumerable<T?>
where T : struct
where T : unmanaged
{
public IList<ReadOnlyDataFrameBuffer<T>> Buffers = new List<ReadOnlyDataFrameBuffer<T>>();

Expand Down Expand Up @@ -90,6 +90,7 @@ public PrimitiveColumnContainer(ReadOnlyMemory<byte> buffer, ReadOnlyMemory<byte
dataBuffer = new ReadOnlyDataFrameBuffer<T>(buffer, length);
}
Buffers.Add(dataBuffer);

int bitMapBufferLength = (length + 7) / 8;
ReadOnlyDataFrameBuffer<byte> nullDataFrameBuffer;
if (nullBitMap.IsEmpty)
Expand Down Expand Up @@ -127,31 +128,7 @@ public PrimitiveColumnContainer(ReadOnlyMemory<byte> buffer, ReadOnlyMemory<byte

public PrimitiveColumnContainer(long length = 0)
{
while (length > 0)
{
if (Buffers.Count == 0)
{
Buffers.Add(new DataFrameBuffer<T>());
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
DataFrameBuffer<T> lastBuffer = (DataFrameBuffer<T>)Buffers[Buffers.Count - 1];
if (lastBuffer.Length == ReadOnlyDataFrameBuffer<T>.MaxCapacity)
{
lastBuffer = new DataFrameBuffer<T>();
Buffers.Add(lastBuffer);
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
int allocatable = (int)Math.Min(length, ReadOnlyDataFrameBuffer<T>.MaxCapacity);
lastBuffer.EnsureCapacity(allocatable);
DataFrameBuffer<byte> lastNullBitMapBuffer = (DataFrameBuffer<byte>)(NullBitMapBuffers[NullBitMapBuffers.Count - 1]);
int nullBufferAllocatable = (allocatable + 7) / 8;
lastNullBitMapBuffer.EnsureCapacity(nullBufferAllocatable);
lastBuffer.Length = allocatable;
lastNullBitMapBuffer.Length = nullBufferAllocatable;
length -= allocatable;
Length += lastBuffer.Length;
NullCount += lastBuffer.Length;
}
AppendMany(null, length);
JakeRadMSFT marked this conversation as resolved.
Show resolved Hide resolved
}

public void Resize(long length)
Expand All @@ -168,16 +145,14 @@ public void Append(T? value)
Buffers.Add(new DataFrameBuffer<T>());
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
int bufferIndex = Buffers.Count - 1;
ReadOnlyDataFrameBuffer<T> lastBuffer = Buffers[bufferIndex];
if (lastBuffer.Length == ReadOnlyDataFrameBuffer<T>.MaxCapacity)

if (Buffers[Buffers.Count - 1].Length == ReadOnlyDataFrameBuffer<T>.MaxCapacity)
{
lastBuffer = new DataFrameBuffer<T>();
Buffers.Add(lastBuffer);
Buffers.Add(new DataFrameBuffer<T>());
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
DataFrameBuffer<T> mutableLastBuffer = DataFrameBuffer<T>.GetMutableBuffer(lastBuffer);
Buffers[bufferIndex] = mutableLastBuffer;

DataFrameBuffer<T> mutableLastBuffer = Buffers.GetOrCreateMutable(Buffers.Count - 1);
mutableLastBuffer.Append(value ?? default);
SetValidityBit(Length, value.HasValue);
Length++;
Expand All @@ -190,90 +165,91 @@ public void AppendMany(T? value, long count)
NullCount += count;
}

while (count > 0)
var remaining = count;
while (remaining > 0)
{
if (Buffers.Count == 0)
{
Buffers.Add(new DataFrameBuffer<T>());
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
int bufferIndex = Buffers.Count - 1;
ReadOnlyDataFrameBuffer<T> lastBuffer = Buffers[bufferIndex];
if (lastBuffer.Length == ReadOnlyDataFrameBuffer<T>.MaxCapacity)

if (Buffers[Buffers.Count - 1].Length == ReadOnlyDataFrameBuffer<T>.MaxCapacity)
{
lastBuffer = new DataFrameBuffer<T>();
Buffers.Add(lastBuffer);
Buffers.Add(new DataFrameBuffer<T>());
NullBitMapBuffers.Add(new DataFrameBuffer<byte>());
}
DataFrameBuffer<T> mutableLastBuffer = DataFrameBuffer<T>.GetMutableBuffer(lastBuffer);
Buffers[bufferIndex] = mutableLastBuffer;
int allocatable = (int)Math.Min(count, ReadOnlyDataFrameBuffer<T>.MaxCapacity);

DataFrameBuffer<T> mutableLastBuffer = Buffers.GetOrCreateMutable(Buffers.Count - 1);
int allocatable = (int)Math.Min(remaining, ReadOnlyDataFrameBuffer<T>.MaxCapacity);
mutableLastBuffer.EnsureCapacity(allocatable);
mutableLastBuffer.RawSpan.Slice(lastBuffer.Length, allocatable).Fill(value ?? default);

DataFrameBuffer<byte> lastNullBitMapBuffer = NullBitMapBuffers.GetOrCreateMutable(NullBitMapBuffers.Count - 1);
int nullBufferAllocatable = (allocatable + 7) / 8;
lastNullBitMapBuffer.EnsureCapacity(nullBufferAllocatable);


mutableLastBuffer.Length += allocatable;
lastNullBitMapBuffer.Length += nullBufferAllocatable;
Length += allocatable;

int nullBitMapBufferIndex = NullBitMapBuffers.Count - 1;
ReadOnlyDataFrameBuffer<byte> lastNullBitMapBuffer = NullBitMapBuffers[nullBitMapBufferIndex];
DataFrameBuffer<byte> mutableLastNullBitMapBuffer = DataFrameBuffer<byte>.GetMutableBuffer(lastNullBitMapBuffer);
NullBitMapBuffers[nullBitMapBufferIndex] = mutableLastNullBitMapBuffer;
int nullBitMapAllocatable = (int)(((uint)allocatable) / 8) + 1;
mutableLastNullBitMapBuffer.EnsureCapacity(nullBitMapAllocatable);
_modifyNullCountWhileIndexing = false;
for (long i = Length - count; i < Length; i++)
if (value.HasValue)
{
SetValidityBit(i, value.HasValue ? true : false);
mutableLastBuffer.RawSpan.Slice(mutableLastBuffer.Length - allocatable, allocatable).Fill(value ?? default);

_modifyNullCountWhileIndexing = false;
for (long i = Length - allocatable; i < Length; i++)
{
SetValidityBit(i, value.HasValue);
}
_modifyNullCountWhileIndexing = true;
}
_modifyNullCountWhileIndexing = true;
count -= allocatable;


remaining -= allocatable;
}
}

public void ApplyElementwise(Func<T?, long, T?> func)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what is the contract for this method? It's going to swap out the buffer in the array with a new one -- what if someone's holding on to the old one? How do you handle reference invalidation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just inherited this but my understanding is the following:

  • This method is going to apply func for each element and update the value in buffer
    • It will make the buffer mutable if it needs to
    • It will update the nullBitMapBuffer

If something is holding on to the old buffer - it shouldn't be/it's outdated.

{
var bufferMaxCapacity = ReadOnlyDataFrameBuffer<T>.MaxCapacity;
for (int b = 0; b < Buffers.Count; b++)
{
ReadOnlyDataFrameBuffer<T> buffer = Buffers[b];
long prevLength = checked(Buffers[0].Length * b);
DataFrameBuffer<T> mutableBuffer = DataFrameBuffer<T>.GetMutableBuffer(buffer);
Buffers[b] = mutableBuffer;
Span<T> span = mutableBuffer.Span;
DataFrameBuffer<byte> mutableNullBitMapBuffer = DataFrameBuffer<byte>.GetMutableBuffer(NullBitMapBuffers[b]);
NullBitMapBuffers[b] = mutableNullBitMapBuffer;
Span<byte> nullBitMapSpan = mutableNullBitMapBuffer.Span;
for (int i = 0; i < span.Length; i++)
long prevLength = checked(bufferMaxCapacity * b);

Span<T> mutableBuffer = Buffers.GetOrCreateMutable(b).Span;
Span<byte> mutableNullBitMapBuffer = NullBitMapBuffers.GetOrCreateMutable(b).Span;

for (int i = 0; i < mutableBuffer.Length; i++)
{
long curIndex = i + prevLength;
bool isValid = IsValid(nullBitMapSpan, i);
T? value = func(isValid ? span[i] : default(T?), curIndex);
span[i] = value.GetValueOrDefault();
SetValidityBit(nullBitMapSpan, i, value != null);
bool isValid = IsValid(mutableNullBitMapBuffer, i);
T? value = func(isValid ? mutableBuffer[i] : null, curIndex);
mutableBuffer[i] = value.GetValueOrDefault();
SetValidityBit(mutableNullBitMapBuffer, i, value != null);
}
}
}

public void Apply<TResult>(Func<T?, TResult?> func, PrimitiveColumnContainer<TResult> resultContainer)
where TResult : unmanaged
{
var bufferMaxCapacity = ReadOnlyDataFrameBuffer<T>.MaxCapacity;
for (int b = 0; b < Buffers.Count; b++)
{
ReadOnlyDataFrameBuffer<T> sourceBuffer = Buffers[b];
ReadOnlySpan<byte> sourceNullBitMap = NullBitMapBuffers[b].ReadOnlySpan;
long prevLength = checked(bufferMaxCapacity * b);
var sourceBuffer = Buffers[b];
var sourceNullBitMap = NullBitMapBuffers[b].ReadOnlySpan;

ReadOnlyDataFrameBuffer<TResult> resultBuffer = resultContainer.Buffers[b];
DataFrameBuffer<TResult> resultMutableBuffer = DataFrameBuffer<TResult>.GetMutableBuffer(resultBuffer);
resultContainer.Buffers[b] = resultMutableBuffer;
Span<TResult> resultSpan = resultMutableBuffer.Span;
DataFrameBuffer<byte> resultMutableNullBitMapBuffer = DataFrameBuffer<byte>.GetMutableBuffer(resultContainer.NullBitMapBuffers[b]);
resultContainer.NullBitMapBuffers[b] = resultMutableNullBitMapBuffer;
Span<byte> resultNullBitMapSpan = resultMutableNullBitMapBuffer.Span;
Span<TResult> mutableResultBuffer = resultContainer.Buffers.GetOrCreateMutable(b).Span;
Span<byte> mutableResultNullBitMapBuffers = resultContainer.NullBitMapBuffers.GetOrCreateMutable(b).Span;

for (int i = 0; i < Buffers[b].Length; i++)
for (int i = 0; i < sourceBuffer.Length; i++)
{
bool isValid = IsValid(sourceNullBitMap, i);
TResult? value = func(isValid ? sourceBuffer[i] : default(T?));
resultSpan[i] = value.GetValueOrDefault();
resultContainer.SetValidityBit(resultNullBitMapSpan, i, value != null);
TResult? value = func(isValid ? sourceBuffer[i] : null);
mutableResultBuffer[i] = value.GetValueOrDefault();
resultContainer.SetValidityBit(mutableResultNullBitMapBuffers, i, value != null);
}
}
}
Expand Down Expand Up @@ -440,11 +416,10 @@ public T? this[long rowIndex]
{
int arrayIndex = GetArrayContainingRowIndex(rowIndex);
rowIndex = rowIndex - arrayIndex * ReadOnlyDataFrameBuffer<T>.MaxCapacity;
ReadOnlyDataFrameBuffer<T> buffer = Buffers[arrayIndex];
DataFrameBuffer<T> mutableBuffer = DataFrameBuffer<T>.GetMutableBuffer(buffer);
Buffers[arrayIndex] = mutableBuffer;
DataFrameBuffer<byte> mutableNullBuffer = DataFrameBuffer<byte>.GetMutableBuffer(NullBitMapBuffers[arrayIndex]);
NullBitMapBuffers[arrayIndex] = mutableNullBuffer;

Buffers.GetOrCreateMutable(arrayIndex);
NullBitMapBuffers.GetOrCreateMutable(arrayIndex);

if (value.HasValue)
{
Buffers[arrayIndex][(int)rowIndex] = value.Value;
Expand Down
27 changes: 27 additions & 0 deletions src/Microsoft.Data.Analysis/PrimitiveColumnContainerHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;

namespace Microsoft.Data.Analysis
{
internal static class PrimitiveColumnContainerHelpers
{
internal static DataFrameBuffer<T> GetOrCreateMutable<T>(this IList<ReadOnlyDataFrameBuffer<T>> bufferList, int index)
where T : unmanaged
{
ReadOnlyDataFrameBuffer<T> sourceBuffer = bufferList[index];
DataFrameBuffer<T> mutableBuffer = sourceBuffer as DataFrameBuffer<T>;

if (mutableBuffer == null)
{
mutableBuffer = DataFrameBuffer<T>.GetMutableBuffer(sourceBuffer);
bufferList[index] = mutableBuffer;
}


return mutableBuffer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Microsoft.Data.Analysis
{
internal interface IPrimitiveDataFrameColumnArithmetic<T>
where T : struct
where T : unmanaged
{
void Add(PrimitiveColumnContainer<T> left, PrimitiveColumnContainer<T> right);
void Add(PrimitiveColumnContainer<T> column, T scalar);
Expand Down Expand Up @@ -54,15 +54,15 @@ internal interface IPrimitiveDataFrameColumnArithmetic<T>
}

internal static class PrimitiveDataFrameColumnArithmetic<T>
where T : struct
where T : unmanaged
{
public static IPrimitiveDataFrameColumnArithmetic<T> Instance { get; } = PrimitiveDataFrameColumnArithmetic.GetArithmetic<T>();
}

internal static class PrimitiveDataFrameColumnArithmetic
{
public static IPrimitiveDataFrameColumnArithmetic<T> GetArithmetic<T>()
where T : struct
where T : unmanaged
{
if (typeof(T) == typeof(bool))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Microsoft.Data.Analysis
{
internal interface IPrimitiveColumnComputation<T>
where T : struct
where T : unmanaged
{
void Abs(PrimitiveColumnContainer<T> column);
void All(PrimitiveColumnContainer<T> column, out bool ret);
Expand All @@ -37,15 +37,15 @@ internal interface IPrimitiveColumnComputation<T>
}

internal static class PrimitiveColumnComputation<T>
where T : struct
where T : unmanaged
{
public static IPrimitiveColumnComputation<T> Instance { get; } = PrimitiveColumnComputation.GetComputation<T>();
}

internal static class PrimitiveColumnComputation
{
public static IPrimitiveColumnComputation<T> GetComputation<T>()
where T : struct
where T : unmanaged
{
if (typeof(T) == typeof(bool))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Data.Analysis/ReadOnlyDataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Data.Analysis
/// </summary>
/// <typeparam name="T"></typeparam>
internal class ReadOnlyDataFrameBuffer<T>
where T : struct
where T : unmanaged
JakeRadMSFT marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly ReadOnlyMemory<byte> _readOnlyBuffer;

Expand Down