Skip to content

Commit 06793ba

Browse files
Allow C Data Interface export of ReadOnlyMemory-backed buffers (#112)
This PR partly addresses #111 by allowing C Data Interface export of buffers backed by `ReadOnlyMemory` rather than an `IMemoryOwner`. This is useful if you want to export data that has been read from an IPC file or stream for example. Previously you would need to first copy all buffers for this to work. Because this is an experimental feature and potentially risky, it must currently be enabled by setting the global static `CArrowArrayExporter.EnableManagedMemoryExport` to `true`. This change was originally submitted by @adamreeve as [https://github.com/apache/arrow/pull/40992](https://github.com/apache/arrow/pull/40992). My only contribution is disabling the feature by default.
1 parent 7ff7375 commit 06793ba

File tree

5 files changed

+232
-5
lines changed

5 files changed

+232
-5
lines changed

src/Apache.Arrow/ArrowBuffer.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Buffers;
1818
using System.Runtime.CompilerServices;
19+
using Apache.Arrow.C;
1920
using Apache.Arrow.Memory;
2021

2122
namespace Apache.Arrow
@@ -89,6 +90,13 @@ internal bool TryExport(ExportedAllocationOwner newOwner, out IntPtr ptr)
8990
return true;
9091
}
9192

93+
if (_memoryOwner == null && CArrowArrayExporter.EnableManagedMemoryExport)
94+
{
95+
var handle = _memory.Pin();
96+
ptr = newOwner.Reference(handle);
97+
return true;
98+
}
99+
92100
ptr = IntPtr.Zero;
93101
return false;
94102
}

src/Apache.Arrow/C/CArrowArrayExporter.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ namespace Apache.Arrow.C
2626
{
2727
public static class CArrowArrayExporter
2828
{
29+
/// <summary>
30+
/// Experimental feature to enable exporting managed memory to CArrowArray. Use with caution.
31+
/// </summary>
32+
public static bool EnableManagedMemoryExport = false;
33+
2934
#if NET5_0_OR_GREATER
3035
private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray;
3136
#else

src/Apache.Arrow/Memory/ExportedAllocationOwner.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414
// limitations under the License.
1515

1616
using System;
17+
using System.Buffers;
1718
using System.Collections.Generic;
18-
using System.Diagnostics;
1919
using System.Runtime.InteropServices;
2020
using System.Threading;
2121

2222
namespace Apache.Arrow.Memory
2323
{
24-
internal sealed class ExportedAllocationOwner : INativeAllocationOwner, IDisposable
24+
internal sealed class ExportedAllocationOwner : IDisposable
2525
{
2626
private readonly List<IntPtr> _pointers = new List<IntPtr>();
27+
private readonly List<MemoryHandle> _handles = new List<MemoryHandle>();
2728
private long _allocationSize;
2829
private long _referenceCount;
2930
private bool _disposed;
@@ -46,9 +47,10 @@ public IntPtr Acquire(IntPtr ptr, int offset, int length)
4647
return ptr;
4748
}
4849

49-
public void Release(IntPtr ptr, int offset, int length)
50+
public unsafe IntPtr Reference(MemoryHandle handle)
5051
{
51-
throw new InvalidOperationException();
52+
_handles.Add(handle);
53+
return new IntPtr(handle.Pointer);
5254
}
5355

5456
public void IncRef()
@@ -70,6 +72,7 @@ public void Dispose()
7072
{
7173
return;
7274
}
75+
7376
for (int i = 0; i < _pointers.Count; i++)
7477
{
7578
if (_pointers[i] != IntPtr.Zero)
@@ -78,6 +81,13 @@ public void Dispose()
7881
_pointers[i] = IntPtr.Zero;
7982
}
8083
}
84+
85+
for (int i = 0; i < _handles.Count; i++)
86+
{
87+
_handles[i].Dispose();
88+
_handles[i] = default;
89+
}
90+
8191
GC.RemoveMemoryPressure(_allocationSize);
8292
GC.SuppressFinalize(this);
8393
_disposed = true;

src/Apache.Arrow/Memory/NativeMemoryManager.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace Apache.Arrow.Memory
2323
public class NativeMemoryManager : MemoryManager<byte>, IOwnableAllocation
2424
{
2525
private IntPtr _ptr;
26+
private int _pinCount;
2627
private readonly int _offset;
2728
private readonly int _length;
2829
private readonly INativeAllocationOwner _owner;
@@ -58,14 +59,15 @@ public override unsafe MemoryHandle Pin(int elementIndex = 0)
5859
// NOTE: Unmanaged memory doesn't require GC pinning because by definition it's not
5960
// managed by the garbage collector.
6061

62+
Interlocked.Increment(ref _pinCount);
6163
void* ptr = CalculatePointer(elementIndex);
6264
return new MemoryHandle(ptr, default, this);
6365
}
6466

6567
public override void Unpin()
6668
{
6769
// SEE: Pin implementation
68-
return;
70+
Interlocked.Decrement(ref _pinCount);
6971
}
7072

7173
protected override void Dispose(bool disposing)
@@ -74,6 +76,17 @@ protected override void Dispose(bool disposing)
7476
IntPtr ptr = Interlocked.Exchange(ref _ptr, IntPtr.Zero);
7577
if (ptr != IntPtr.Zero)
7678
{
79+
if (disposing)
80+
{
81+
// Only need to check for pinned data when disposing.
82+
// If disposed from the finalizer, there can be no MemoryHandles to this memory.
83+
if (_pinCount > 0)
84+
{
85+
_ptr = ptr;
86+
throw new InvalidOperationException("cannot free native memory while it is pinned");
87+
}
88+
}
89+
7790
_owner.Release(ptr, _offset, _length);
7891
}
7992
}

test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,64 @@ public unsafe void ExportArray()
722722
CArrowSchema.Free(cSchema);
723723
}
724724

725+
[SkippableFact]
726+
public unsafe void ExportManagedMemoryArray()
727+
{
728+
using (new EnableManagedExport())
729+
{
730+
var expectedValues = Enumerable.Range(0, 100).Select(i => i % 10 == 3 ? null : (long?)i).ToArray();
731+
var gcRefs = new List<WeakReference>();
732+
733+
void TestExport()
734+
{
735+
var array = CreateManagedMemoryArray(expectedValues, gcRefs);
736+
737+
dynamic pyArray;
738+
using (Py.GIL())
739+
{
740+
dynamic pa = Py.Import("pyarrow");
741+
pyArray = pa.array(expectedValues);
742+
}
743+
744+
CArrowArray* cArray = CArrowArray.Create();
745+
CArrowArrayExporter.ExportArray(array, cArray);
746+
747+
CArrowSchema* cSchema = CArrowSchema.Create();
748+
CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema);
749+
750+
GcCollect();
751+
foreach (var weakRef in gcRefs)
752+
{
753+
Assert.True(weakRef.IsAlive);
754+
}
755+
756+
long arrayPtr = ((IntPtr)cArray).ToInt64();
757+
long schemaPtr = ((IntPtr)cSchema).ToInt64();
758+
759+
using (Py.GIL())
760+
{
761+
dynamic pa = Py.Import("pyarrow");
762+
dynamic exportedPyArray = pa.Array._import_from_c(arrayPtr, schemaPtr);
763+
Assert.True(exportedPyArray == pyArray);
764+
765+
// Required for the Python object to be garbage collected:
766+
exportedPyArray.Dispose();
767+
}
768+
769+
CArrowArray.Free(cArray);
770+
CArrowSchema.Free(cSchema);
771+
}
772+
773+
TestExport();
774+
775+
GcCollect();
776+
foreach (var weakRef in gcRefs)
777+
{
778+
Assert.False(weakRef.IsAlive);
779+
}
780+
}
781+
}
782+
725783
[SkippableFact]
726784
public unsafe void ExportBatch()
727785
{
@@ -930,6 +988,95 @@ public unsafe void ImportRecordBatchFromBuffer()
930988
}
931989
}
932990

991+
[SkippableFact]
992+
public async Task ExportBatchReadFromIpc()
993+
{
994+
var originalBatch = GetTestRecordBatch();
995+
dynamic pyBatch = GetPythonRecordBatch();
996+
997+
using (new EnableManagedExport())
998+
using (var stream = new MemoryStream())
999+
{
1000+
var writer = new ArrowStreamWriter(stream, originalBatch.Schema);
1001+
await writer.WriteRecordBatchAsync(originalBatch);
1002+
await writer.WriteEndAsync();
1003+
1004+
stream.Seek(0, SeekOrigin.Begin);
1005+
1006+
var reader = new ArrowStreamReader(stream);
1007+
using var batch = await reader.ReadNextRecordBatchAsync();
1008+
1009+
Assert.NotNull(batch);
1010+
Assert.Equal(originalBatch.Length, batch.Length);
1011+
1012+
unsafe
1013+
{
1014+
CArrowArray* cArray = CArrowArray.Create();
1015+
CArrowArrayExporter.ExportRecordBatch(batch, cArray);
1016+
1017+
CArrowSchema* cSchema = CArrowSchema.Create();
1018+
CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
1019+
1020+
long arrayPtr = ((IntPtr)cArray).ToInt64();
1021+
long schemaPtr = ((IntPtr)cSchema).ToInt64();
1022+
1023+
using (Py.GIL())
1024+
{
1025+
dynamic pa = Py.Import("pyarrow");
1026+
dynamic exportedPyArray = pa.RecordBatch._import_from_c(arrayPtr, schemaPtr);
1027+
Assert.True(exportedPyArray == pyBatch);
1028+
1029+
// Dispose to unpin memory
1030+
exportedPyArray.Dispose();
1031+
}
1032+
1033+
CArrowArray.Free(cArray);
1034+
CArrowSchema.Free(cSchema);
1035+
}
1036+
}
1037+
}
1038+
1039+
[SkippableFact]
1040+
public async Task EarlyDisposeOfExportedBatch()
1041+
{
1042+
// Reading IPC data from a Stream creates Arrow buffers backed by ReadOnlyMemory that point
1043+
// to slices of a single memory buffer owned by the RecordBatch (unless compression is used).
1044+
// Using the exported data after the RecordBatch has been disposed could cause
1045+
// memory corruption or access violations.
1046+
1047+
var originalBatch = GetTestRecordBatch();
1048+
1049+
using (new EnableManagedExport())
1050+
using (var stream = new MemoryStream())
1051+
{
1052+
var writer = new ArrowStreamWriter(stream, originalBatch.Schema);
1053+
await writer.WriteRecordBatchAsync(originalBatch);
1054+
await writer.WriteEndAsync();
1055+
1056+
stream.Seek(0, SeekOrigin.Begin);
1057+
1058+
var reader = new ArrowStreamReader(stream);
1059+
using var batch = await reader.ReadNextRecordBatchAsync();
1060+
1061+
Assert.NotNull(batch);
1062+
Assert.Equal(originalBatch.Length, batch.Length);
1063+
1064+
unsafe
1065+
{
1066+
CArrowArray* cArray = CArrowArray.Create();
1067+
CArrowArrayExporter.ExportRecordBatch(batch, cArray);
1068+
1069+
CArrowSchema* cSchema = CArrowSchema.Create();
1070+
CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
1071+
1072+
Assert.Throws<InvalidOperationException>(() => batch.Dispose());
1073+
1074+
CArrowArray.Free(cArray);
1075+
CArrowSchema.Free(cSchema);
1076+
}
1077+
}
1078+
}
1079+
9331080
private static PyObject List(params int?[] values)
9341081
{
9351082
return new PyList(values.Select(i => i == null ? PyObject.None : new PyInt(i.Value)).ToArray());
@@ -960,6 +1107,34 @@ private static PyObject Tuple(params int?[] values)
9601107
return new PyTuple(values.Select(i => i == null ? PyObject.None : new PyInt(i.Value)).ToArray());
9611108
}
9621109

1110+
private static IArrowArray CreateManagedMemoryArray(long?[] values, List<WeakReference> gcRefs)
1111+
{
1112+
var data = new byte[values.Length * sizeof(long)];
1113+
var validity = new byte[BitUtility.ByteCount(values.Length)];
1114+
var typedData = data.AsSpan().CastTo<long>();
1115+
var nullCount = 0;
1116+
for (var i = 0; i < values.Length; ++i)
1117+
{
1118+
BitUtility.SetBit(validity, i, values[i].HasValue);
1119+
typedData[i] = values[i].GetValueOrDefault(0);
1120+
nullCount += values[i].HasValue ? 0 : 1;
1121+
}
1122+
1123+
gcRefs.Add(new WeakReference(data));
1124+
gcRefs.Add(new WeakReference(validity));
1125+
1126+
return new Int64Array(new ArrowBuffer(data), new ArrowBuffer(validity), values.Length, nullCount, 0);
1127+
}
1128+
1129+
private static void GcCollect()
1130+
{
1131+
for (int i = 0; i < 3; ++i)
1132+
{
1133+
GC.Collect();
1134+
GC.WaitForPendingFinalizers();
1135+
}
1136+
}
1137+
9631138
sealed class TestArrayStream : IArrowArrayStream
9641139
{
9651140
private readonly RecordBatch[] _batches;
@@ -986,5 +1161,21 @@ public void Dispose()
9861161
_index = -1;
9871162
}
9881163
}
1164+
1165+
sealed class EnableManagedExport : IDisposable
1166+
{
1167+
readonly bool _previousValue;
1168+
1169+
public EnableManagedExport()
1170+
{
1171+
_previousValue = CArrowArrayExporter.EnableManagedMemoryExport;
1172+
CArrowArrayExporter.EnableManagedMemoryExport = true;
1173+
}
1174+
1175+
public void Dispose()
1176+
{
1177+
CArrowArrayExporter.EnableManagedMemoryExport = _previousValue;
1178+
}
1179+
}
9891180
}
9901181
}

0 commit comments

Comments
 (0)