Skip to content

Commit 8467490

Browse files
BrianXu0623Brian Xu
andauthored
Support python custom message builder and make Data field's type return MemoryView (#380)
This PR is for resolving the following issue: [issue](#379) 1. Created `_PyCustomMessageBuilder` extends `MessageBuilder`, enabling the ability to customise the `SegmentAllocate` method in Python. This allows allocation and data population within shared memory, and supports zero-copy inter-process data transfer by passing segment offsets. 2. Fields of type `Data` now support being set with a `memoryview`. When retrieving a `Data` field from a `DynamicStructBuilder`, it will return a writable `memoryview`, allowing users to modify the data directly. This enables memory to be pre-allocated and content to be modified in later, eliminating an extra copy. When retrieving a `Data` field from a `DynamicStructReader`, it will return a read-only `memoryview`, allowing user to read data without memory copy. * add memoryview and custom builder * support set dynamic field * add curSize * add initialSize and lastSize * change StringPtr name * add test case * refine test case * convert func to py callable object * add initial value * refine example * add copy as_reader and new_message, make structReader's data field return RO memoryView * rebase master and bugfix * reformat flake8 * refine test case * refine test cases for blob * remove unused import for flake8 * run black . --------- Co-authored-by: Brian Xu <brian.xu1@bytedance.com>
1 parent 6268297 commit 8467490

10 files changed

+304
-17
lines changed

capnp/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
_InterfaceModule,
4949
_ListSchema,
5050
_MallocMessageBuilder,
51+
_PyCustomMessageBuilder,
5152
_PackedFdMessageReader,
5253
_StreamFdMessageReader,
5354
_StructModule,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#include "PyCustomMessageBuilder.h"
2+
#include <stdexcept>
3+
4+
namespace capnp {
5+
6+
PyCustomMessageBuilder::PyCustomMessageBuilder(
7+
PyObject* allocateSegmentCallable, uint firstSegmentWords)
8+
: allocateSegmentCallable(allocateSegmentCallable), firstSize(firstSegmentWords)
9+
{
10+
KJ_REQUIRE(PyCallable_Check(allocateSegmentCallable),
11+
"allocateSegmentCallable must be callable");
12+
Py_INCREF(allocateSegmentCallable);
13+
}
14+
15+
PyCustomMessageBuilder::~PyCustomMessageBuilder() noexcept(false) {
16+
PyGILState_STATE gstate = PyGILState_Ensure();
17+
18+
for (auto* obj : allocatedBuffers) {
19+
Py_DECREF(obj);
20+
}
21+
allocatedBuffers.clear();
22+
23+
Py_DECREF(allocateSegmentCallable);
24+
PyGILState_Release(gstate);
25+
}
26+
27+
kj::ArrayPtr<capnp::word> PyCustomMessageBuilder::allocateSegment(capnp::uint minimumSize) {
28+
PyGILState_STATE gstate = PyGILState_Ensure();
29+
KJ_DEFER({ PyGILState_Release(gstate); });
30+
if (curSize == 0) {
31+
minimumSize = kj::max(minimumSize, firstSize);
32+
}
33+
PyObject* pyBufObj = PyObject_CallFunction(allocateSegmentCallable, "I", minimumSize);
34+
KJ_REQUIRE(pyBufObj, "PyCustomMessageBuilder: allocateSegment failed");
35+
allocatedBuffers.push_back(pyBufObj);
36+
37+
38+
Py_buffer view;
39+
int bufRes = PyObject_GetBuffer(pyBufObj, &view, PyBUF_SIMPLE);
40+
KJ_REQUIRE(bufRes == 0, "PyCustomMessageBuilder: object does not support buffer protocol");
41+
KJ_DEFER({ PyBuffer_Release(&view); });
42+
43+
size_t byteCount = view.len;
44+
size_t wordCount = byteCount / sizeof(capnp::word);
45+
KJ_REQUIRE(wordCount >= minimumSize, "PyCustomMessageBuilder: buffer too small for minimumSize");
46+
curSize += wordCount;
47+
return kj::arrayPtr(reinterpret_cast<capnp::word*>(view.buf), wordCount);
48+
}
49+
50+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include "Python.h"
4+
#include <capnp/message.h>
5+
#include <capnp/serialize.h>
6+
#include <vector>
7+
8+
namespace capnp {
9+
10+
class PyCustomMessageBuilder : public capnp::MessageBuilder {
11+
public:
12+
explicit PyCustomMessageBuilder(PyObject* allocateSegmentCallable,
13+
uint firstSegmentWords = capnp::SUGGESTED_FIRST_SEGMENT_WORDS);
14+
15+
~PyCustomMessageBuilder() noexcept(false) override;
16+
17+
kj::ArrayPtr<capnp::word> allocateSegment(capnp::uint minimumSize) override;
18+
19+
private:
20+
PyObject* allocateSegmentCallable;
21+
22+
uint firstSize;
23+
uint curSize = 0;
24+
25+
std::vector<PyObject*> allocatedBuffers;
26+
};
27+
28+
}

capnp/includes/schema_cpp.pxd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,11 @@ cdef extern from "capnp/message.h" namespace " ::capnp":
714714
enum Void:
715715
VOID
716716

717+
cdef extern from "PyCustomMessageBuilder.h" namespace " ::capnp":
718+
cdef cppclass PyCustomMessageBuilder(MessageBuilder):
719+
PyCustomMessageBuilder(PyObject* allocateSegmentCallable)
720+
PyCustomMessageBuilder(PyObject* allocateSegmentCallable, int firstSegmentSize)
721+
717722
cdef extern from "capnp/common.h" namespace " ::capnp":
718723
cdef cppclass word nogil:
719724
pass

capnp/lib/capnp.pxd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ cdef class _DynamicStructReader:
6666
cpdef _get_by_field(self, _StructSchemaField field)
6767
cpdef _has_by_field(self, _StructSchemaField field)
6868

69-
cpdef as_builder(self, num_first_segment_words=?)
69+
cpdef as_builder(self, num_first_segment_words=?, allocate_seg_callable=?)
7070

7171

7272
cdef class _DynamicStructBuilder:
@@ -99,7 +99,7 @@ cdef class _DynamicStructBuilder:
9999
cpdef disown(self, field)
100100

101101
cpdef as_reader(self)
102-
cpdef copy(self, num_first_segment_words=?)
102+
cpdef copy(self, num_first_segment_words=?, allocate_seg_callable=?)
103103

104104
cdef class _DynamicEnumField:
105105
cdef object thisptr

capnp/lib/capnp.pyx

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ from capnp.helpers.helpers cimport init_capnp_api
1313
from capnp.includes.capnp_cpp cimport AsyncIoStream, WaitScope, PyPromise, VoidPromise, EventPort, EventLoop, PyAsyncIoStream, PromiseFulfiller, VoidPromiseFulfiller, tryReadMessage, writeMessage, makeException, PythonInterfaceDynamicImpl
1414
from capnp.includes.schema_cpp cimport (MessageReader,)
1515

16+
from builtins import memoryview as BuiltinsMemoryview
1617
from cpython cimport array, Py_buffer, PyObject_CheckBuffer
17-
from cpython.buffer cimport PyBUF_SIMPLE, PyBUF_WRITABLE, PyBUF_WRITE, PyBUF_READ
18+
from cpython.buffer cimport PyBUF_SIMPLE, PyBUF_WRITABLE, PyBUF_WRITE, PyBUF_READ, PyBUF_CONTIG_RO
1819
from cpython.memoryview cimport PyMemoryView_FromMemory
1920
from cpython.exc cimport PyErr_Clear
2021
from cython.operator cimport dereference as deref
@@ -667,7 +668,7 @@ cdef to_python_reader(C_DynamicValue.Reader self, object parent):
667668
return (<char*>temp_text.begin())[:temp_text.size()]
668669
elif type == capnp.TYPE_DATA:
669670
temp_data = self.asData()
670-
return <bytes>((<char*>temp_data.begin())[:temp_data.size()])
671+
return PyMemoryView_FromMemory(<char *> temp_data.begin(), temp_data.size(), PyBUF_READ)
671672
elif type == capnp.TYPE_LIST:
672673
return _DynamicListReader()._init(self.asList(), parent)
673674
elif type == capnp.TYPE_STRUCT:
@@ -701,7 +702,7 @@ cdef to_python_builder(C_DynamicValue.Builder self, object parent):
701702
return (<char*>temp_text.begin())[:temp_text.size()]
702703
elif type == capnp.TYPE_DATA:
703704
temp_data = self.asData()
704-
return <bytes>((<char*>temp_data.begin())[:temp_data.size()])
705+
return PyMemoryView_FromMemory(<char *> temp_data.begin(), temp_data.size(), PyBUF_WRITE)
705706
elif type == capnp.TYPE_LIST:
706707
return _DynamicListBuilder()._init(self.asList(), parent)
707708
elif type == capnp.TYPE_STRUCT:
@@ -766,6 +767,20 @@ cdef _setBytes(_DynamicSetterClasses thisptr, field, value):
766767
cdef C_DynamicValue.Reader temp = C_DynamicValue.Reader(temp_string)
767768
thisptr.set(field, temp)
768769

770+
cdef _setMemoryview(_DynamicSetterClasses thisptr, field, value):
771+
cdef Py_buffer buf
772+
cdef capnp.StringPtr temp_string
773+
cdef C_DynamicValue.Reader temp
774+
if PyObject_GetBuffer(value, &buf, PyBUF_CONTIG_RO) != 0:
775+
raise KjException(
776+
"cannot get buffer from memory view, for field '{}'".format(field)
777+
)
778+
try:
779+
temp_string = capnp.StringPtr(<char *> buf.buf, buf.len)
780+
temp = C_DynamicValue.Reader(temp_string)
781+
thisptr.set(field, temp)
782+
finally:
783+
PyBuffer_Release(&buf)
769784

770785
cdef _setBaseString(_DynamicSetterClasses thisptr, field, value):
771786
encoded_value = value.encode('utf-8')
@@ -779,6 +794,20 @@ cdef _setBytesField(DynamicStruct_Builder thisptr, _StructSchemaField field, val
779794
cdef C_DynamicValue.Reader temp = C_DynamicValue.Reader(temp_string)
780795
thisptr.setByField(field.thisptr, temp)
781796

797+
cdef _setMemoryviewField(DynamicStruct_Builder thisptr, _StructSchemaField field, value):
798+
cdef Py_buffer buf
799+
cdef capnp.StringPtr temp_string
800+
cdef C_DynamicValue.Reader temp
801+
if PyObject_GetBuffer(value, &buf, PyBUF_CONTIG_RO) != 0:
802+
raise KjException(
803+
"cannot get buffer from memory view, for field '{}'".format(field)
804+
)
805+
try:
806+
temp_string = capnp.StringPtr(<char *>buf.buf, buf.len)
807+
temp = C_DynamicValue.Reader(temp_string)
808+
thisptr.setByField(field.thisptr, temp)
809+
finally:
810+
PyBuffer_Release(&buf)
782811

783812
cdef _setBaseStringField(DynamicStruct_Builder thisptr, _StructSchemaField field, value):
784813
encoded_value = value.encode('utf-8')
@@ -805,6 +834,8 @@ cdef _setDynamicField(_DynamicSetterClasses thisptr, field, value, parent):
805834
thisptr.set(field, temp)
806835
elif value_type is bytes:
807836
_setBytes(thisptr, field, value)
837+
elif isinstance(value, BuiltinsMemoryview):
838+
_setMemoryview(thisptr, field, value)
808839
elif isinstance(value, basestring):
809840
_setBaseString(thisptr, field, value)
810841
elif value_type is list:
@@ -870,6 +901,8 @@ cdef _setDynamicFieldWithField(DynamicStruct_Builder thisptr, _StructSchemaField
870901
thisptr.setByField(field.thisptr, temp)
871902
elif value_type is bytes:
872903
_setBytesField(thisptr, field, value)
904+
elif isinstance(value, BuiltinsMemoryview):
905+
_setMemoryviewField(thisptr, field, value)
873906
elif isinstance(value, basestring):
874907
_setBaseStringField(thisptr, field, value)
875908
elif value_type is list:
@@ -1242,19 +1275,28 @@ cdef class _DynamicStructReader:
12421275
def to_dict(self, verbose=False, ordered=False, encode_bytes_as_base64=False):
12431276
return _to_dict(self, verbose, ordered, encode_bytes_as_base64)
12441277

1245-
cpdef as_builder(self, num_first_segment_words=None):
1278+
cpdef as_builder(self, num_first_segment_words=None, allocate_seg_callable=None):
12461279
"""A method for casting this Reader to a Builder
12471280
12481281
This is a copying operation with respect to the message's buffer.
12491282
Changes in the new builder will not reflect in the original reader.
12501283
12511284
:type num_first_segment_words: int
12521285
:param num_first_segment_words: Size of the first segment to allocate (in words ie. 8 byte increments)
1286+
1287+
:type allocate_seg_callable: Callable[[int], bytearray]
1288+
:param allocate_seg_callable: A python callable object that takes the minimum number of 8-byte
1289+
words to allocate (as an `int`) and returns a `bytearray`. This is used to customize the memory
1290+
allocation strategy.
12531291
12541292
:rtype: :class:`_DynamicStructBuilder`
12551293
"""
1256-
builder = _MallocMessageBuilder(num_first_segment_words)
1257-
return builder.set_root(self)
1294+
if allocate_seg_callable is None:
1295+
builder = _MallocMessageBuilder(num_first_segment_words)
1296+
return builder.set_root(self)
1297+
else:
1298+
builder = _PyCustomMessageBuilder(allocate_seg_callable, num_first_segment_words)
1299+
return builder.set_root(self)
12581300

12591301
property total_size:
12601302
def __get__(self):
@@ -1593,19 +1635,28 @@ cdef class _DynamicStructBuilder:
15931635
reader._obj_to_pin = self
15941636
return reader
15951637

1596-
cpdef copy(self, num_first_segment_words=None):
1638+
cpdef copy(self, num_first_segment_words=None, allocate_seg_callable=None):
15971639
"""A method for copying this Builder
15981640
15991641
This is a copying operation with respect to the message's buffer.
16001642
Changes in the new builder will not reflect in the original reader.
16011643
16021644
:type num_first_segment_words: int
16031645
:param num_first_segment_words: Size of the first segment to allocate (in words ie. 8 byte increments)
1646+
1647+
:type allocate_seg_callable: Callable[[int], bytearray]
1648+
:param allocate_seg_callable: A python callable object that takes the minimum number of 8-byte
1649+
words to allocate (as an `int`) and returns a `bytearray`. This is used to customize the memory
1650+
allocation strategy.
16041651
16051652
:rtype: :class:`_DynamicStructBuilder`
16061653
"""
1607-
builder = _MallocMessageBuilder(num_first_segment_words)
1608-
return builder.set_root(self)
1654+
if allocate_seg_callable is None:
1655+
builder = _MallocMessageBuilder(num_first_segment_words)
1656+
return builder.set_root(self)
1657+
else:
1658+
builder = _PyCustomMessageBuilder(allocate_seg_callable, num_first_segment_words)
1659+
return builder.set_root(self)
16091660

16101661
property schema:
16111662
"""A property that returns the _StructSchema object matching this writer"""
@@ -3145,8 +3196,12 @@ class _StructABCMeta(type):
31453196
return isinstance(obj, cls.__base__) and obj.schema == cls._schema
31463197

31473198

3148-
cdef _new_message(self, kwargs, num_first_segment_words):
3149-
builder = _MallocMessageBuilder(num_first_segment_words)
3199+
cdef _new_message(self, kwargs, num_first_segment_words, allocate_seg_callable):
3200+
cdef _MessageBuilder builder
3201+
if allocate_seg_callable is None:
3202+
builder = _MallocMessageBuilder(num_first_segment_words)
3203+
else:
3204+
builder = _PyCustomMessageBuilder(allocate_seg_callable, num_first_segment_words)
31503205
msg = builder.init_root(self.schema)
31513206
if kwargs is not None:
31523207
msg.from_dict(kwargs)
@@ -3387,12 +3442,17 @@ class _StructModule(object):
33873442
def __call__(self, num_first_segment_words=None, **kwargs):
33883443
return self.new_message(num_first_segment_words=num_first_segment_words, **kwargs)
33893444

3390-
def new_message(self, num_first_segment_words=None, **kwargs):
3445+
def new_message(self, num_first_segment_words=None, allocate_seg_callable=None, **kwargs):
33913446
"""Returns a newly allocated builder message.
33923447
33933448
:type num_first_segment_words: int
33943449
:param num_first_segment_words: Size of the first segment to allocate (in words ie. 8 byte increments)
33953450
3451+
:type allocate_seg_callable: Callable[[int], bytearray]
3452+
:param allocate_seg_callable: A python callable object that takes the minimum number of 8-byte
3453+
words to allocate (as an `int`) and returns a `bytearray`. This is used to customize the memory
3454+
allocation strategy.
3455+
33963456
:type kwargs: dict
33973457
:param kwargs: A list of fields and their values to initialize in the struct.
33983458
@@ -3401,7 +3461,7 @@ class _StructModule(object):
34013461
34023462
:rtype: :class:`_DynamicStructBuilder`
34033463
"""
3404-
return _new_message(self, kwargs, num_first_segment_words)
3464+
return _new_message(self, kwargs, num_first_segment_words, allocate_seg_callable)
34053465

34063466

34073467
class _InterfaceModule(object):
@@ -3758,6 +3818,50 @@ cdef class _MallocMessageBuilder(_MessageBuilder):
37583818
self.thisptr = new schema_cpp.MallocMessageBuilder(size)
37593819

37603820

3821+
cdef class _PyCustomMessageBuilder(_MessageBuilder):
3822+
"""The class for building Cap'n Proto messages,
3823+
with customised memory allocation strategy
3824+
3825+
You will use this class if you want to customise the allocateSegment method,
3826+
and define your own memory allocation strategy.
3827+
"""
3828+
def __init__(self, allocate_seg_callable, size=None):
3829+
""" The constructor requires you to provide a Python callable object as a parameter.
3830+
This callable object will be invoked in the allocateSegment method of the MessageBuilder
3831+
to allocate memory. The allocated memory will be managed within the MessageBuilder.
3832+
3833+
:type allocate_seg_callable: Callable[[int], bytearray]
3834+
:param allocate_seg_callable: A python callable object that takes the minimum number of 8-byte
3835+
words to allocate (as an `int`) and returns a `bytearray`. This is used to customize the memory
3836+
allocation strategy.
3837+
3838+
Required function signature is like this:
3839+
def __call__(self, minimum_size: int) -> bytearray:
3840+
Note that the unit of minimum_size is words, ie. 8 byte increments.
3841+
3842+
class Allocator:
3843+
def __init__(self):
3844+
self.cur_size = 0
3845+
def __call__(self, minimum_size: int) -> bytearray:
3846+
size = max(minimum_size, self.cur_size)
3847+
self.cur_size += size
3848+
WORD_SIZE = 8
3849+
byte_count = size * WORD_SIZE
3850+
return bytearray(byte_count)
3851+
3852+
addressbook = capnp.load('addressbook.capnp')
3853+
message = capnp._PyCustomMessageBuilder(allocator)
3854+
person = message.init_root(addressbook.Person)
3855+
3856+
:type size: int
3857+
:param size: Size of the first segment to allocate (in words ie. 8 byte increments)
3858+
"""
3859+
if size is None:
3860+
self.thisptr = new schema_cpp.PyCustomMessageBuilder(<PyObject*>allocate_seg_callable)
3861+
else:
3862+
self.thisptr = new schema_cpp.PyCustomMessageBuilder(<PyObject*>allocate_seg_callable, size)
3863+
3864+
37613865
cdef class _MessageReader:
37623866
"""An abstract base class for reading Cap'n Proto messages
37633867

0 commit comments

Comments
 (0)