Skip to content

Commit 144acbe

Browse files
Yevhenii Kizimhaata
authored andcommitted
Add structure-free read_multiple_bytes_packed
Motivation: A server sends data packages that consist of multiple serialized capnproto messages of different structures. Every message is guaranteed to have the same first field, which works as a message header containing information about the message structure type. The scheme comprises the `UnknownMessage` structure that allows parsing the header only. Solution: provide a public interface that iterates buffer with AnyPointer readers to cast a message to `UnknownMessage` first and then to a specific structure type.
1 parent 237fa7d commit 144acbe

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

capnp/lib/capnp.pyx

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4119,6 +4119,44 @@ cdef class _MultipleBytesPackedMessageReader:
41194119
return self
41204120

41214121

4122+
cdef class _MultipleBytesPackedAnyMessageReader:
4123+
cdef schema_cpp.ArrayInputStream * stream
4124+
cdef schema_cpp.BufferedInputStream * buffered_stream
4125+
cdef Py_buffer view
4126+
4127+
cdef public object traversal_limit_in_words, nesting_limit, schema, buf
4128+
4129+
def __init__(self, buf, traversal_limit_in_words=None, nesting_limit=None):
4130+
self.traversal_limit_in_words = traversal_limit_in_words
4131+
self.nesting_limit = nesting_limit
4132+
4133+
if PyObject_GetBuffer(buf, &self.view, PyBUF_SIMPLE) != 0:
4134+
raise KjException("could not get read buffer")
4135+
4136+
self.buf = buf
4137+
self.stream = new schema_cpp.ArrayInputStream(schema_cpp.ByteArrayPtr(<byte *>self.view.buf, self.view.len))
4138+
self.buffered_stream = new schema_cpp.BufferedInputStreamWrapper(deref(self.stream))
4139+
4140+
def __dealloc__(self):
4141+
PyBuffer_Release(&self.view)
4142+
del self.buffered_stream
4143+
del self.stream
4144+
4145+
def __next__(self):
4146+
try:
4147+
reader = _PackedMessageReader()._init(
4148+
deref(self.buffered_stream), self.traversal_limit_in_words, self.nesting_limit, self)
4149+
return reader.get_root_as_any()
4150+
except KjException as e:
4151+
if 'EOF' in str(e):
4152+
raise StopIteration
4153+
else:
4154+
raise
4155+
4156+
def __iter__(self):
4157+
return self
4158+
4159+
41224160
@cython.internal
41234161
cdef class _AlignedBuffer:
41244162
cdef char * buf
@@ -4418,6 +4456,25 @@ def load(file_name, display_name=None, imports=[]):
44184456
return _global_schema_parser.load(file_name, display_name, imports)
44194457

44204458

4459+
def read_multiple_bytes_packed(buf, traversal_limit_in_words=None, nesting_limit=None):
4460+
"""Returns an iterable, that when traversed will return Readers for AnyPointer messages.
4461+
4462+
:type buf: buffer
4463+
:param buf: Any Python object that supports the buffer interface.
4464+
4465+
:type traversal_limit_in_words: int
4466+
:param traversal_limit_in_words: Limits how many total words of data are allowed to be traversed.
4467+
Is actually a uint64_t, and values can be up to 2^64-1. Default is 8*1024*1024.
4468+
4469+
:type nesting_limit: int
4470+
:param nesting_limit: Limits how many total words of data are allowed to be traversed. Default is 64.
4471+
4472+
:rtype: Iterable with elements of :class:`_DynamicStructReader`"""
4473+
4474+
reader = _MultipleBytesPackedAnyMessageReader(buf, traversal_limit_in_words, nesting_limit)
4475+
return reader
4476+
4477+
44214478
# Automatically include the system and built-in capnp paths
44224479
# Highest priority at position 0
44234480
_capnp_paths = [

test/test_structs_sequence.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import os
2+
3+
import pytest
4+
5+
import capnp
6+
7+
this_dir = os.path.dirname(__file__)
8+
9+
10+
@pytest.fixture
11+
def message_schemas():
12+
return capnp.load(os.path.join(this_dir, "test_structs_sequence.capnp"))
13+
14+
15+
@pytest.fixture
16+
def make_apple(message_schemas):
17+
def _make_apple(color: str):
18+
apple = message_schemas.Apple.new_message()
19+
apple.fruitId = message_schemas.FruitId.apple
20+
apple.color = color
21+
return apple
22+
23+
return _make_apple
24+
25+
26+
@pytest.fixture
27+
def red_apple(make_apple):
28+
return make_apple("Red")
29+
30+
31+
@pytest.fixture
32+
def green_apple(make_apple):
33+
return make_apple("Green")
34+
35+
36+
@pytest.fixture
37+
def banana(message_schemas):
38+
banana_ = message_schemas.Banana.new_message()
39+
banana_.fruitId = message_schemas.FruitId.banana
40+
banana_.length = 12.345
41+
return banana_
42+
43+
44+
@pytest.fixture
45+
def cherry(message_schemas):
46+
cherry_ = message_schemas.Cherry.new_message()
47+
cherry_.fruitId = message_schemas.FruitId.cherry
48+
cherry_.sweetness = 64
49+
return cherry_
50+
51+
52+
@pytest.fixture
53+
def fruit_basket(cherry, red_apple, banana, green_apple):
54+
return [cherry, red_apple, banana, green_apple]
55+
56+
57+
@pytest.fixture
58+
def fruit_basket_encoded(fruit_basket):
59+
return b"".join(fruit.to_bytes_packed() for fruit in fruit_basket)
60+
61+
62+
@pytest.fixture
63+
def expected(fruit_basket):
64+
return [fruit.to_dict() for fruit in fruit_basket]
65+
66+
67+
def test_parse_structs_sequence(message_schemas, fruit_basket_encoded, expected):
68+
# ARRANGE
69+
reader = capnp.read_multiple_bytes_packed(fruit_basket_encoded)
70+
71+
def _parse_fruit(any_):
72+
unknown_fruit = any_.as_struct(message_schemas.UnknownFruit)
73+
if unknown_fruit.fruitId == message_schemas.FruitId.apple:
74+
return any_.as_struct(message_schemas.Apple)
75+
76+
if unknown_fruit.fruitId == message_schemas.FruitId.banana:
77+
return any_.as_struct(message_schemas.Banana)
78+
79+
if unknown_fruit.fruitId == message_schemas.FruitId.cherry:
80+
return any_.as_struct(message_schemas.Cherry)
81+
82+
return unknown_fruit
83+
84+
# ACT
85+
parsed = [_parse_fruit(any_).to_dict() for any_ in reader]
86+
87+
# ASSERT
88+
assert parsed == expected
89+
90+
91+
def test_empty_sequence():
92+
reader = capnp.read_multiple_bytes_packed(b"")
93+
assert len(list(reader)) == 0

0 commit comments

Comments
 (0)