Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ void stream_record_batches(std::ostream& os, record_batch_source& source)

### Deserialize a stream into record batches

#### Using the function API

```cpp
#include <vector>
#include <sparrow_ipc/deserializer.hpp>
#include <sparrow_ipc/deserialize.hpp>
#include <sparrow/record_batch.hpp>

namespace sp = sparrow;
Expand All @@ -122,6 +124,36 @@ std::vector<sp::record_batch> deserialize_stream_to_batches(const std::vector<ui
}
```

#### Using the deserializer class

```cpp
#include <span>
#include <vector>
#include <sparrow_ipc/deserializer.hpp>
#include <sparrow/record_batch.hpp>

namespace sp = sparrow;
namespace sp_ipc = sparrow_ipc;

void deserialize_incremental_stream(const std::vector<std::vector<uint8_t>>& stream_chunks)
{
std::vector<sp::record_batch> batches;
sp_ipc::deserializer deser(batches);

// Deserialize chunks incrementally as they arrive
for (const auto& chunk : stream_chunks)
{
deser << std::span<const uint8_t>(chunk);
}

// Process accumulated batches
for (const auto& batch : batches)
{
// Process each batch...
}
}
```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still add this to examples file to ensure that the same code (as long as it's copied correctly) is running without failures (no typos etc).

## Documentation

The documentation (currently being written) can be found at https://quantstack.github.io/sparrow-ipc/index.html
Expand Down
48 changes: 48 additions & 0 deletions include/sparrow_ipc/deserializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <cstddef>
#include <iterator>
#include <numeric>
#include <ranges>

#include <sparrow/record_batch.hpp>

#include "deserialize.hpp"
#include "sparrow_ipc/deserialize.hpp"

namespace sparrow_ipc
{
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
class deserializer
{
public:

deserializer(R& data)
: m_data(&data)
{
}

void deserialize(std::span<const uint8_t> data)
{
// Insert at the end of m_data container the deserialized record batches
auto& container = *m_data;
auto deserialized_batches = sparrow_ipc::deserialize_stream(data);
container.insert(
std::end(container),
std::make_move_iterator(std::begin(deserialized_batches)),
std::make_move_iterator(std::end(deserialized_batches))
);
}

deserializer& operator<<(std::span<const uint8_t> data)
{
deserialize(data);
return *this;
}

private:

R* m_data;
};
}
26 changes: 19 additions & 7 deletions src/deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ namespace sparrow_ipc
const size_t length = static_cast<size_t>(record_batch.length());
size_t buffer_index = 0;

const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast<size_t>(schema.fields()->size());
std::vector<sparrow::array> arrays;
arrays.reserve(schema.fields()->size());
if (num_fields == 0)
{
return arrays;
}
arrays.reserve(num_fields);
size_t field_idx = 0;
for (const auto field : *(schema.fields()))
{
Expand Down Expand Up @@ -215,18 +220,24 @@ namespace sparrow_ipc
case org::apache::arrow::flatbuf::MessageHeader::Schema:
{
schema = message->header_as_Schema();
const size_t size = static_cast<size_t>(schema->fields()->size());
const size_t size = schema->fields() == nullptr
? 0
: static_cast<size_t>(schema->fields()->size());
field_names.reserve(size);
fields_nullable.reserve(size);
fields_metadata.reserve(size);

if (schema->fields() == nullptr)
{
break;
}
for (const auto field : *(schema->fields()))
{
if(field != nullptr && field->name() != nullptr)
if (field != nullptr && field->name() != nullptr)
{
field_names.emplace_back(field->name()->str());
field_names.emplace_back(field->name()->str());
}
else {
else
{
field_names.emplace_back("_unnamed_");
}
fields_nullable.push_back(field->nullable());
Expand Down Expand Up @@ -257,7 +268,8 @@ namespace sparrow_ipc
encapsulated_message,
fields_metadata
);
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of
// record_batch is fixed
sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays));
record_batches.emplace_back(std::move(sp_record_batch));
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SPARROW_IPC_TESTS_SRC
test_chunk_memory_output_stream.cpp
test_chunk_memory_serializer.cpp
test_de_serialization_with_files.cpp
test_deserializer.cpp
$<$<NOT:$<BOOL:${SPARROW_IPC_BUILD_SHARED}>>:test_flatbuffer_utils.cpp>
test_memory_output_streams.cpp
test_serialize_utils.cpp
Expand Down
Loading
Loading