Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Apache.Arrow/ArrowBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using Apache.Arrow.C;
using Apache.Arrow.Memory;

namespace Apache.Arrow
Expand Down Expand Up @@ -89,6 +90,13 @@ internal bool TryExport(ExportedAllocationOwner newOwner, out IntPtr ptr)
return true;
}

if (_memoryOwner == null && CArrowArrayExporter.EnableManagedMemoryExport)
{
var handle = _memory.Pin();
ptr = newOwner.Reference(handle);
return true;
}

ptr = IntPtr.Zero;
return false;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ namespace Apache.Arrow.C
{
public static class CArrowArrayExporter
{
/// <summary>
/// Experimental feature to enable exporting managed memory to CArrowArray. Use with caution.
/// </summary>
public static bool EnableManagedMemoryExport = false;

#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray;
#else
Expand Down
18 changes: 14 additions & 4 deletions src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
// limitations under the License.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;

namespace Apache.Arrow.Memory
{
internal sealed class ExportedAllocationOwner : INativeAllocationOwner, IDisposable
internal sealed class ExportedAllocationOwner : IDisposable
{
private readonly List<IntPtr> _pointers = new List<IntPtr>();
private readonly List<MemoryHandle> _handles = new List<MemoryHandle>();
private long _allocationSize;
private long _referenceCount;
private bool _disposed;
Expand All @@ -46,9 +47,10 @@ public IntPtr Acquire(IntPtr ptr, int offset, int length)
return ptr;
}

public void Release(IntPtr ptr, int offset, int length)
public unsafe IntPtr Reference(MemoryHandle handle)
{
throw new InvalidOperationException();
_handles.Add(handle);
return new IntPtr(handle.Pointer);
}

public void IncRef()
Expand All @@ -70,6 +72,7 @@ public void Dispose()
{
return;
}

for (int i = 0; i < _pointers.Count; i++)
{
if (_pointers[i] != IntPtr.Zero)
Expand All @@ -78,6 +81,13 @@ public void Dispose()
_pointers[i] = IntPtr.Zero;
}
}

for (int i = 0; i < _handles.Count; i++)
{
_handles[i].Dispose();
_handles[i] = default;
}

GC.RemoveMemoryPressure(_allocationSize);
GC.SuppressFinalize(this);
_disposed = true;
Expand Down
15 changes: 14 additions & 1 deletion src/Apache.Arrow/Memory/NativeMemoryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Apache.Arrow.Memory
public class NativeMemoryManager : MemoryManager<byte>, IOwnableAllocation
{
private IntPtr _ptr;
private int _pinCount;
private readonly int _offset;
private readonly int _length;
private readonly INativeAllocationOwner _owner;
Expand Down Expand Up @@ -58,14 +59,15 @@ public override unsafe MemoryHandle Pin(int elementIndex = 0)
// NOTE: Unmanaged memory doesn't require GC pinning because by definition it's not
// managed by the garbage collector.

Interlocked.Increment(ref _pinCount);
void* ptr = CalculatePointer(elementIndex);
return new MemoryHandle(ptr, default, this);
}

public override void Unpin()
{
// SEE: Pin implementation
return;
Interlocked.Decrement(ref _pinCount);
}

protected override void Dispose(bool disposing)
Expand All @@ -74,6 +76,17 @@ protected override void Dispose(bool disposing)
IntPtr ptr = Interlocked.Exchange(ref _ptr, IntPtr.Zero);
if (ptr != IntPtr.Zero)
{
if (disposing)
{
// Only need to check for pinned data when disposing.
// If disposed from the finalizer, there can be no MemoryHandles to this memory.
if (_pinCount > 0)
{
_ptr = ptr;
throw new InvalidOperationException("cannot free native memory while it is pinned");
}
}

_owner.Release(ptr, _offset, _length);
}
}
Expand Down
191 changes: 191 additions & 0 deletions test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,64 @@ public unsafe void ExportArray()
CArrowSchema.Free(cSchema);
}

[SkippableFact]
public unsafe void ExportManagedMemoryArray()
{
using (new EnableManagedExport())
{
var expectedValues = Enumerable.Range(0, 100).Select(i => i % 10 == 3 ? null : (long?)i).ToArray();
var gcRefs = new List<WeakReference>();

void TestExport()
{
var array = CreateManagedMemoryArray(expectedValues, gcRefs);

dynamic pyArray;
using (Py.GIL())
{
dynamic pa = Py.Import("pyarrow");
pyArray = pa.array(expectedValues);
}

CArrowArray* cArray = CArrowArray.Create();
CArrowArrayExporter.ExportArray(array, cArray);

CArrowSchema* cSchema = CArrowSchema.Create();
CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema);

GcCollect();
foreach (var weakRef in gcRefs)
{
Assert.True(weakRef.IsAlive);
}

long arrayPtr = ((IntPtr)cArray).ToInt64();
long schemaPtr = ((IntPtr)cSchema).ToInt64();

using (Py.GIL())
{
dynamic pa = Py.Import("pyarrow");
dynamic exportedPyArray = pa.Array._import_from_c(arrayPtr, schemaPtr);
Assert.True(exportedPyArray == pyArray);

// Required for the Python object to be garbage collected:
exportedPyArray.Dispose();
}

CArrowArray.Free(cArray);
CArrowSchema.Free(cSchema);
}

TestExport();

GcCollect();
foreach (var weakRef in gcRefs)
{
Assert.False(weakRef.IsAlive);
}
}
}

[SkippableFact]
public unsafe void ExportBatch()
{
Expand Down Expand Up @@ -930,6 +988,95 @@ public unsafe void ImportRecordBatchFromBuffer()
}
}

[SkippableFact]
public async Task ExportBatchReadFromIpc()
{
var originalBatch = GetTestRecordBatch();
dynamic pyBatch = GetPythonRecordBatch();

using (new EnableManagedExport())
using (var stream = new MemoryStream())
{
var writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();

stream.Seek(0, SeekOrigin.Begin);

var reader = new ArrowStreamReader(stream);
using var batch = await reader.ReadNextRecordBatchAsync();

Assert.NotNull(batch);
Assert.Equal(originalBatch.Length, batch.Length);

unsafe
{
CArrowArray* cArray = CArrowArray.Create();
CArrowArrayExporter.ExportRecordBatch(batch, cArray);

CArrowSchema* cSchema = CArrowSchema.Create();
CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);

long arrayPtr = ((IntPtr)cArray).ToInt64();
long schemaPtr = ((IntPtr)cSchema).ToInt64();

using (Py.GIL())
{
dynamic pa = Py.Import("pyarrow");
dynamic exportedPyArray = pa.RecordBatch._import_from_c(arrayPtr, schemaPtr);
Assert.True(exportedPyArray == pyBatch);

// Dispose to unpin memory
exportedPyArray.Dispose();
}

CArrowArray.Free(cArray);
CArrowSchema.Free(cSchema);
}
}
}

[SkippableFact]
public async Task EarlyDisposeOfExportedBatch()
{
// Reading IPC data from a Stream creates Arrow buffers backed by ReadOnlyMemory that point
// to slices of a single memory buffer owned by the RecordBatch (unless compression is used).
// Using the exported data after the RecordBatch has been disposed could cause
// memory corruption or access violations.

var originalBatch = GetTestRecordBatch();

using (new EnableManagedExport())
using (var stream = new MemoryStream())
{
var writer = new ArrowStreamWriter(stream, originalBatch.Schema);
await writer.WriteRecordBatchAsync(originalBatch);
await writer.WriteEndAsync();

stream.Seek(0, SeekOrigin.Begin);

var reader = new ArrowStreamReader(stream);
using var batch = await reader.ReadNextRecordBatchAsync();

Assert.NotNull(batch);
Assert.Equal(originalBatch.Length, batch.Length);

unsafe
{
CArrowArray* cArray = CArrowArray.Create();
CArrowArrayExporter.ExportRecordBatch(batch, cArray);

CArrowSchema* cSchema = CArrowSchema.Create();
CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);

Assert.Throws<InvalidOperationException>(() => batch.Dispose());

CArrowArray.Free(cArray);
CArrowSchema.Free(cSchema);
}
}
}

private static PyObject List(params int?[] values)
{
return new PyList(values.Select(i => i == null ? PyObject.None : new PyInt(i.Value)).ToArray());
Expand Down Expand Up @@ -960,6 +1107,34 @@ private static PyObject Tuple(params int?[] values)
return new PyTuple(values.Select(i => i == null ? PyObject.None : new PyInt(i.Value)).ToArray());
}

private static IArrowArray CreateManagedMemoryArray(long?[] values, List<WeakReference> gcRefs)
{
var data = new byte[values.Length * sizeof(long)];
var validity = new byte[BitUtility.ByteCount(values.Length)];
var typedData = data.AsSpan().CastTo<long>();
var nullCount = 0;
for (var i = 0; i < values.Length; ++i)
{
BitUtility.SetBit(validity, i, values[i].HasValue);
typedData[i] = values[i].GetValueOrDefault(0);
nullCount += values[i].HasValue ? 0 : 1;
}

gcRefs.Add(new WeakReference(data));
gcRefs.Add(new WeakReference(validity));

return new Int64Array(new ArrowBuffer(data), new ArrowBuffer(validity), values.Length, nullCount, 0);
}

private static void GcCollect()
{
for (int i = 0; i < 3; ++i)
{
GC.Collect();
GC.WaitForPendingFinalizers();
}
}

sealed class TestArrayStream : IArrowArrayStream
{
private readonly RecordBatch[] _batches;
Expand All @@ -986,5 +1161,21 @@ public void Dispose()
_index = -1;
}
}

sealed class EnableManagedExport : IDisposable
{
readonly bool _previousValue;

public EnableManagedExport()
{
_previousValue = CArrowArrayExporter.EnableManagedMemoryExport;
CArrowArrayExporter.EnableManagedMemoryExport = true;
}

public void Dispose()
{
CArrowArrayExporter.EnableManagedMemoryExport = _previousValue;
}
}
}
}