@@ -74,6 +74,7 @@ class SequenceBuilder {
7474        doubles_(::arrow::float64(), pool),
7575        date64s_(::arrow::date64(), pool),
7676        tensor_indices_(::arrow::int32(), pool),
77+         ndarray_indices_(::arrow::int32(), pool),
7778        buffer_indices_(::arrow::int32(), pool),
7879        list_offsets_({0 }),
7980        tuple_offsets_({0 }),
@@ -160,6 +161,14 @@ class SequenceBuilder {
160161    return  tensor_indices_.Append (tensor_index);
161162  }
162163
164+   // / Appending a numpy ndarray to the sequence
165+   // /
166+   // / \param tensor_index Index of the tensor in the object.
167+   Status AppendNdarray (const  int32_t  ndarray_index) {
168+     RETURN_NOT_OK (Update (ndarray_indices_.length (), &ndarray_tag_));
169+     return  ndarray_indices_.Append (ndarray_index);
170+   }
171+ 
163172  // / Appending a buffer to the sequence
164173  // /
165174  // / \param buffer_index Indes of the buffer in the object.
@@ -265,6 +274,7 @@ class SequenceBuilder {
265274    RETURN_NOT_OK (AddElement (date64_tag_, &date64s_));
266275    RETURN_NOT_OK (AddElement (tensor_tag_, &tensor_indices_, " tensor" 
267276    RETURN_NOT_OK (AddElement (buffer_tag_, &buffer_indices_, " buffer" 
277+     RETURN_NOT_OK (AddElement (ndarray_tag_, &ndarray_indices_, " ndarray" 
268278
269279    RETURN_NOT_OK (AddSubsequence (list_tag_, list_data, list_offsets_, " list" 
270280    RETURN_NOT_OK (AddSubsequence (tuple_tag_, tuple_data, tuple_offsets_, " tuple" 
@@ -307,6 +317,7 @@ class SequenceBuilder {
307317  Date64Builder date64s_;
308318
309319  Int32Builder tensor_indices_;
320+   Int32Builder ndarray_indices_;
310321  Int32Builder buffer_indices_;
311322
312323  std::vector<int32_t > list_offsets_;
@@ -331,6 +342,7 @@ class SequenceBuilder {
331342
332343  int8_t  tensor_tag_ = -1 ;
333344  int8_t  buffer_tag_ = -1 ;
345+   int8_t  ndarray_tag_ = -1 ;
334346  int8_t  list_tag_ = -1 ;
335347  int8_t  tuple_tag_ = -1 ;
336348  int8_t  dict_tag_ = -1 ;
@@ -557,6 +569,11 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
557569    std::shared_ptr<Buffer> buffer;
558570    RETURN_NOT_OK (unwrap_buffer (elem, &buffer));
559571    blobs_out->buffers .push_back (buffer);
572+   } else  if  (is_tensor (elem)) {
573+     RETURN_NOT_OK (builder->AppendTensor (static_cast <int32_t >(blobs_out->tensors .size ())));
574+     std::shared_ptr<Tensor> tensor;
575+     RETURN_NOT_OK (unwrap_tensor (elem, &tensor));
576+     blobs_out->tensors .push_back (tensor);
560577  } else  {
561578    //  Attempt to serialize the object using the custom callback.
562579    PyObject* serialized_object;
@@ -584,11 +601,11 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
584601    case  NPY_FLOAT:
585602    case  NPY_DOUBLE: {
586603      RETURN_NOT_OK (
587-           builder->AppendTensor (static_cast <int32_t >(blobs_out->tensors .size ())));
604+           builder->AppendNdarray (static_cast <int32_t >(blobs_out->ndarrays .size ())));
588605      std::shared_ptr<Tensor> tensor;
589606      RETURN_NOT_OK (NdarrayToTensor (default_memory_pool (),
590607                                    reinterpret_cast <PyObject*>(array), &tensor));
591-       blobs_out->tensors .push_back (tensor);
608+       blobs_out->ndarrays .push_back (tensor);
592609    } break ;
593610    default : {
594611      PyObject* serialized_object;
@@ -757,11 +774,17 @@ Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
757774
758775Status SerializedPyObject::WriteTo (io::OutputStream* dst) {
759776  int32_t  num_tensors = static_cast <int32_t >(this ->tensors .size ());
777+   int32_t  num_ndarrays = static_cast <int32_t >(this ->ndarrays .size ());
760778  int32_t  num_buffers = static_cast <int32_t >(this ->buffers .size ());
761779  RETURN_NOT_OK (
762780      dst->Write (reinterpret_cast <const  uint8_t *>(&num_tensors), sizeof (int32_t )));
781+   RETURN_NOT_OK (
782+       dst->Write (reinterpret_cast <const  uint8_t *>(&num_ndarrays), sizeof (int32_t )));
763783  RETURN_NOT_OK (
764784      dst->Write (reinterpret_cast <const  uint8_t *>(&num_buffers), sizeof (int32_t )));
785+ 
786+   //  Align stream to 8-byte offset
787+   RETURN_NOT_OK (ipc::AlignStream (dst, ipc::kArrowIpcAlignment ));
765788  RETURN_NOT_OK (ipc::WriteRecordBatchStream ({this ->batch }, dst));
766789
767790  //  Align stream to 64-byte offset so tensor bodies are 64-byte aligned
@@ -774,6 +797,11 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
774797    RETURN_NOT_OK (ipc::AlignStream (dst, ipc::kTensorAlignment ));
775798  }
776799
800+   for  (const  auto & tensor : this ->ndarrays ) {
801+     RETURN_NOT_OK (ipc::WriteTensor (*tensor, dst, &metadata_length, &body_length));
802+     RETURN_NOT_OK (ipc::AlignStream (dst, ipc::kTensorAlignment ));
803+   }
804+ 
777805  for  (const  auto & buffer : this ->buffers ) {
778806    int64_t  size = buffer->size ();
779807    RETURN_NOT_OK (dst->Write (reinterpret_cast <const  uint8_t *>(&size), sizeof (int64_t )));
@@ -795,6 +823,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
795823  //  quite esoteric
796824  PyDict_SetItemString (result.obj (), " num_tensors" 
797825                       PyLong_FromSize_t (this ->tensors .size ()));
826+   PyDict_SetItemString (result.obj (), " num_ndarrays" 
827+                        PyLong_FromSize_t (this ->ndarrays .size ()));
798828  PyDict_SetItemString (result.obj (), " num_buffers" 
799829                       PyLong_FromSize_t (this ->buffers .size ()));
800830  PyDict_SetItemString (result.obj (), " data" 
@@ -835,6 +865,14 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
835865    RETURN_NOT_OK (PushBuffer (message->body ()));
836866  }
837867
868+   //  For each ndarray, get a metadata buffer and a buffer for the body
869+   for  (const  auto & ndarray : this ->ndarrays ) {
870+     std::unique_ptr<ipc::Message> message;
871+     RETURN_NOT_OK (ipc::GetTensorMessage (*ndarray, memory_pool, &message));
872+     RETURN_NOT_OK (PushBuffer (message->metadata ()));
873+     RETURN_NOT_OK (PushBuffer (message->body ()));
874+   }
875+ 
838876  for  (const  auto & buf : this ->buffers ) {
839877    RETURN_NOT_OK (PushBuffer (buf));
840878  }
0 commit comments