Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/BufferOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* BufferOutputStream_Create(std::shared_ptr<arrow::io::BufferOutputStream>** output_stream)
PARQUETSHARP_EXPORT ExceptionInfo* BufferOutputStream_Create(::arrow::MemoryPool* pool, std::shared_ptr<arrow::io::BufferOutputStream>** output_stream)
{
TRYCATCH
(
if (pool == nullptr) {
pool = ::arrow::default_memory_pool();
}
PARQUET_ASSIGN_OR_THROW(
const std::shared_ptr<arrow::io::BufferOutputStream> output,
arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool()));
arrow::io::BufferOutputStream::Create(1024, pool));

*output_stream = new std::shared_ptr<arrow::io::BufferOutputStream>(output);
)
Expand Down
23 changes: 22 additions & 1 deletion cpp/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,28 @@ extern "C"
{
TRYCATCH(*memory_pool = arrow::default_memory_pool();)
}


PARQUETSHARP_EXPORT ExceptionInfo* MemoryPool_System_Memory_Pool(const arrow::MemoryPool** memory_pool)
{
TRYCATCH(*memory_pool = arrow::system_memory_pool();)
}

PARQUETSHARP_EXPORT ExceptionInfo* MemoryPool_Jemalloc_Memory_Pool(arrow::MemoryPool** memory_pool)
{
TRYCATCH(
auto status = arrow::jemalloc_memory_pool(memory_pool);
PARQUET_THROW_NOT_OK(status);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* MemoryPool_Mimalloc_Memory_Pool(arrow::MemoryPool** memory_pool)
{
TRYCATCH(
auto status = arrow::mimalloc_memory_pool(memory_pool);
PARQUET_THROW_NOT_OK(status);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* MemoryPool_Bytes_Allocated(const arrow::MemoryPool* memory_pool, int64_t* bytes_allocated)
{
TRYCATCH(*bytes_allocated = memory_pool->bytes_allocated();)
Expand Down
13 changes: 13 additions & 0 deletions cpp/ReaderProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ extern "C"
TRYCATCH(*reader_properties = new ReaderProperties(default_reader_properties());)
}

PARQUETSHARP_EXPORT ExceptionInfo* ReaderProperties_With_Memory_Pool(::arrow::MemoryPool* memory_pool, ReaderProperties** reader_properties)
{
TRYCATCH(*reader_properties = new ReaderProperties(memory_pool);)
}

PARQUETSHARP_EXPORT void ReaderProperties_Free(ReaderProperties* reader_properties)
{
delete reader_properties;
Expand Down Expand Up @@ -71,4 +76,12 @@ extern "C"
{
TRYCATCH(reader_properties->set_page_checksum_verification(false);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ReaderProperties_Get_Memory_Pool(const ReaderProperties* reader_properties, ::arrow::MemoryPool** memory_pool)
{
TRYCATCH
(
*memory_pool = reader_properties->memory_pool();
)
}
}
4 changes: 2 additions & 2 deletions cpp/ResizableBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ResizableBuffer_Create(const int64_t initialSize, std::shared_ptr<arrow::ResizableBuffer>** buffer)
PARQUETSHARP_EXPORT ExceptionInfo* ResizableBuffer_Create(const int64_t initialSize, ::arrow::MemoryPool* memory_pool, std::shared_ptr<arrow::ResizableBuffer>** buffer)
{
TRYCATCH(
auto pBuffer = arrow::AllocateResizableBuffer(initialSize);
auto pBuffer = arrow::AllocateResizableBuffer(initialSize, memory_pool);
*buffer = new std::shared_ptr<arrow::ResizableBuffer>(pBuffer.ValueOrDie().release());
)
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/WriterProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,9 @@ extern "C"
delete[] descending;
delete[] nulls_first;
}

PARQUETSHARP_EXPORT ExceptionInfo* WriterProperties_Memory_Pool(const std::shared_ptr<WriterProperties>* writer_properties, ::arrow::MemoryPool** memory_pool)
{
TRYCATCH(*memory_pool = (*writer_properties)->memory_pool();)
}
}
5 changes: 5 additions & 0 deletions cpp/WriterPropertiesBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,9 @@ extern "C"

TRYCATCH(builder->set_sorting_columns(sorting_columns);)
}

PARQUETSHARP_EXPORT ExceptionInfo* WriterPropertiesBuilder_Memory_Pool(WriterProperties::Builder* builder, ::arrow::MemoryPool* pool)
{
TRYCATCH(builder->memory_pool(pool);)
}
}
9 changes: 8 additions & 1 deletion cpp/arrow/FileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ extern "C"
{
TRYCATCH
(
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::MemoryPool* pool = reader_properties == nullptr
? arrow::default_memory_pool()
: reader_properties->memory_pool();
std::shared_ptr<arrow::io::ReadableFile> input_file;
std::unique_ptr<FileReader> reader_ptr;
PARQUET_ASSIGN_OR_THROW(input_file, arrow::io::ReadableFile::Open(path, pool));
FileReaderBuilder builder;
builder.memory_pool(pool);
PARQUET_THROW_NOT_OK(builder.Open(input_file, *reader_properties));
if (arrow_reader_properties != nullptr) {
builder.properties(*arrow_reader_properties);
Expand All @@ -45,6 +48,10 @@ extern "C"
(
std::unique_ptr<FileReader> reader_ptr;
FileReaderBuilder builder;
arrow::MemoryPool* pool = reader_properties == nullptr
? arrow::default_memory_pool()
: reader_properties->memory_pool();
builder.memory_pool(pool);
PARQUET_THROW_NOT_OK(builder.Open(*readable_file_interface, *reader_properties));
if (arrow_reader_properties != nullptr) {
builder.properties(*arrow_reader_properties);
Expand Down
8 changes: 6 additions & 2 deletions cpp/arrow/FileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ extern "C"
{
TRYCATCH
(
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::MemoryPool* pool = writer_properties == nullptr
? arrow::default_memory_pool()
: (*writer_properties)->memory_pool();

std::shared_ptr<::arrow::io::OutputStream> output_stream;
PARQUET_ASSIGN_OR_THROW(output_stream, ::arrow::io::FileOutputStream::Open(path));
Expand Down Expand Up @@ -54,7 +56,9 @@ extern "C"
{
TRYCATCH
(
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::MemoryPool* pool = writer_properties == nullptr
? arrow::default_memory_pool()
: (*writer_properties)->memory_pool();

std::shared_ptr<parquet::WriterProperties> properties = writer_properties == nullptr
? parquet::default_writer_properties()
Expand Down
32 changes: 32 additions & 0 deletions csharp.test/Arrow/TestFileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,38 @@ public void TestAccessSchemaManifestFieldAfterDisposed()
Assert.That(exception!.Message, Does.Contain("owning parent has been disposed"));
}

[TestCaseSource(typeof(MemoryPools), nameof(MemoryPools.TestCases))]
public async Task TestReadWithMemoryPool(MemoryPools.TestCase memoryPool)
{
using var buffer = new ResizableBuffer(memoryPool: MemoryPool.SystemMemoryPool());
WriteTestFile(buffer);

var pool = memoryPool.Pool;
using var readerProperties = pool == null
? ReaderProperties.GetDefaultReaderProperties()
: ReaderProperties.WithMemoryPool(pool);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream, readerProperties);
using var batchReader = fileReader.GetRecordBatchReader();

var rowsRead = 0;
while (true)
{
using var batch = await batchReader.ReadNextRecordBatchAsync();
if (batch == null)
{
break;
}
rowsRead += batch.Length;
}

var expectedPool = pool ?? MemoryPool.GetDefaultMemoryPool();
Assert.That(expectedPool.BytesAllocated, Is.GreaterThan(0));

Assert.That(rowsRead, Is.EqualTo(RowsPerRowGroup * NumRowGroups));
}

private static void WriteTestFile(ResizableBuffer buffer)
{
var columns = new Column[]
Expand Down
14 changes: 11 additions & 3 deletions csharp.test/Arrow/TestFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public async Task TestWriteRecordBatch()
await VerifyData(inStream, numRows);
}

[Test]
public async Task TestWriteBufferedRecordBatches()
[TestCaseSource(typeof(MemoryPools), nameof(MemoryPools.TestCases))]
public async Task TestWriteBufferedRecordBatches(MemoryPools.TestCase memoryPool)
{
var fields = new[]
{
Expand All @@ -152,10 +152,16 @@ RecordBatch GetBatch(int xVal, int numRows)
return new RecordBatch(schema, arrays, numRows);
}

using var buffer = new ResizableBuffer();
using var buffer = new ResizableBuffer(memoryPool: MemoryPool.SystemMemoryPool());
using (var outStream = new BufferOutputStream(buffer))
{
using var propertiesBuilder = new WriterPropertiesBuilder();
if (memoryPool.Pool != null)
{
propertiesBuilder.MemoryPool(memoryPool.Pool);
}
var expectedPool = memoryPool.Pool ?? MemoryPool.GetDefaultMemoryPool();

propertiesBuilder.MaxRowGroupLength(250);
using var writerProperties = propertiesBuilder.Build();
using var writer = new FileWriter(outStream, schema, writerProperties);
Expand All @@ -165,6 +171,8 @@ RecordBatch GetBatch(int xVal, int numRows)
using var batch1 = GetBatch(0, 100);
writer.WriteBufferedRecordBatch(batch1);

Assert.That(expectedPool.BytesAllocated, Is.GreaterThan(0));

writer.NewBufferedRowGroup();

using var batch2 = GetBatch(1, 100);
Expand Down
64 changes: 64 additions & 0 deletions csharp.test/MemoryPools.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Collections.Generic;
using System.Linq;

namespace ParquetSharp.Test
{
public class MemoryPools
{
/// <summary>
/// Used as an Nunit TestCaseSource to test methods with different memory pools.
/// </summary>
public static TestCase[] TestCases()
{
var pools = new List<MemoryPool?>()
{
null,
MemoryPool.SystemMemoryPool(),
};

try
{
pools.Add(MemoryPool.MimallocMemoryPool());
}
catch (ParquetException)
{
// Mimalloc not available
}

try
{
pools.Add(MemoryPool.JemallocMemoryPool());
}
catch (ParquetException)
{
// Jemalloc not available
}

return pools.Select(p => new TestCase(p)).ToArray();
}

/// <summary>
/// Used as an Nunit TestCaseSource to test methods with different memory pools.
/// Excludes the null pool.
/// </summary>
public static TestCase[] NonNullTestCases()
{
return TestCases().Where(t => t.Pool != null).ToArray();
}

public class TestCase
{
public TestCase(MemoryPool? memoryPool)
{
Pool = memoryPool;
}

public MemoryPool? Pool { get; }

public override string ToString()
{
return Pool?.BackendName ?? "null";
}
}
}
}
14 changes: 8 additions & 6 deletions csharp.test/TestBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ public static unsafe void TestParquetReadFromBuffer()
}
}

[Test]
public static void TestBufferOutputStreamFinish()
[TestCaseSource(typeof(MemoryPools), nameof(MemoryPools.TestCases))]
public static void TestBufferOutputStreamFinish(MemoryPools.TestCase memoryPool)
{
var expected = Enumerable.Range(0, 100).ToArray();
using var outStream = new BufferOutputStream();
using var outStream = memoryPool.Pool == null
? new BufferOutputStream()
: new BufferOutputStream(memoryPool.Pool);

// Write out a single column
using (var fileWriter = new ParquetFileWriter(outStream, new Column[] {new Column<int>("int_field")}))
Expand All @@ -86,10 +88,10 @@ public static void TestBufferOutputStreamFinish()
Assert.AreEqual(expected, allData);
}

[Test]
public static void TestResizeBuffer()
[TestCaseSource(typeof(MemoryPools), nameof(MemoryPools.TestCases))]
public static void TestResizeBuffer(MemoryPools.TestCase memoryPool)
{
using var buffer = new ResizableBuffer(initialSize: 128);
using var buffer = new ResizableBuffer(initialSize: 128, memoryPool: memoryPool.Pool);
const int newLength = 256;
buffer.Resize(newLength);
var values = Enumerable.Range(0, newLength).Select(i => (byte) i).ToArray();
Expand Down
Loading