1
1
# This file was generated by pbtools.
2
2
3
- class PbtoolsError(Error):
4
- message: string
5
-
6
- @enum
7
- class WireType:
8
- Varint = 0
9
- Bits64 = 1
10
- LengthDelimited = 2
11
- Bits32 = 5
12
-
13
- class Encoder:
14
- pos: u64
15
- buf: bytes
16
-
17
- def __init__(self):
18
- self.pos = 99
19
- self.buf = bytes(100)
20
-
21
- def write_enum(self, field_number: i64, value: i64):
22
- self.write(field_number, i32(value))
23
-
24
- def write(self, field_number: i64, value: string):
25
- if len(value) == 0:
26
- return
27
-
28
- data = value.to_utf8()
29
- self.write(data)
30
- self.write_tagged_varint(field_number, WireType.LengthDelimited, len(data))
31
-
32
- def write(self, field_number: i64, value: i32):
33
- self.write_tagged_varint(field_number, WireType.Varint, u64(i64(value)))
34
-
35
- def write(self, value: bytes):
36
- self.pos -= len(value)
37
-
38
- for i in range(i64(len(value))):
39
- self.buf[i64(self.pos) + 1 + i] = value[i]
40
-
41
- def write_length_delimited(self, field_number: i64, value: u64):
42
- self.write_varint(value)
43
- self.write_tag(field_number, WireType.LengthDelimited)
44
-
45
- def write_tagged_varint(self, field_number: i64, wire_type: WireType, value: u64):
46
- if value == 0:
47
- return
48
-
49
- self.write_varint(value)
50
- self.write_tag(field_number, wire_type)
51
-
52
- def write_varint(self, value: u64):
53
- buf = b""
54
-
55
- while True:
56
- buf += u8(value | 0x80)
57
- value >>= 7
58
-
59
- if value == 0:
60
- break
61
-
62
- buf[-1] &= 0x7f
63
- self.write(buf)
64
-
65
- def write_tag(self, field_number: i64, wire_type: WireType):
66
- self.write_varint(u64((field_number << 3) | i64(wire_type)))
67
-
68
- def data(self) -> bytes:
69
- r = b""
70
-
71
- for i in range(i64(self.pos + 1), i64(len(self.buf))):
72
- r += self.buf[i]
73
-
74
- return r
75
-
76
- class Decoder:
77
- data: bytes
78
- pos: i64
79
-
80
- def __init__(self, data: bytes):
81
- self.data = data
82
- self.pos = 0
83
-
84
- def available(self) -> bool:
85
- return self.pos < i64(len(self.data))
86
-
87
- def get(self) -> u8:
88
- if not self.available():
89
- raise PbtoolsError("Out of data.")
90
-
91
- value = self.data[self.pos]
92
- self.pos += 1
93
-
94
- return value
95
-
96
- def read_varint(self) -> u64:
97
- value: u64 = 0
98
- offset: u64 = 0
99
- byte: u8 = 0x80
100
-
101
- while (offset < 64) and ((byte & 0x80) == 0x80):
102
- byte = self.get()
103
- value |= ((u64(byte) & 0x7f) << offset)
104
- offset += 7
105
-
106
- if (byte & 0x80) == 0x80:
107
- raise PbtoolsError("Varint overflow.")
108
-
109
- return value
110
-
111
- def read_varint_check_wire_type(self,
112
- wire_type: WireType,
113
- expected_wire_type: WireType) -> u64:
114
- if wire_type != expected_wire_type:
115
- raise PbtoolsError("Unecpected wire type.")
116
-
117
- return self.read_varint()
118
-
119
- def read_varint_check_wire_type_varint(self, wire_type: WireType) -> u64:
120
- return self.read_varint_check_wire_type(wire_type, WireType.Varint)
121
-
122
- def read_tag(self) -> (i64, WireType):
123
- value = self.read_varint()
124
- field_number = i64(value >> 3)
125
-
126
- if field_number == 0:
127
- raise PbtoolsError("Field number is zero.")
128
-
129
- return (field_number, WireType(i64(value) & 7))
130
-
131
- def read_length_delimited(self, wire_type: WireType) -> u64:
132
- return self.read_varint_check_wire_type(wire_type, WireType.LengthDelimited)
133
-
134
- def read_string(self, wire_type: WireType) -> string:
135
- size = self.read_length_delimited(wire_type)
136
-
137
- return string(self.read(size))
138
-
139
- def read_i32(self, wire_type: WireType) -> i32:
140
- return i32(self.read_i64(wire_type))
141
-
142
- def read_i64(self, wire_type: WireType) -> i64:
143
- return i64(self.read_varint_check_wire_type_varint(wire_type))
144
-
145
- def read(self, size: u64) -> bytes:
146
- data = b""
147
-
148
- while size > 0:
149
- data += self.get()
150
- size -= 1
151
-
152
- return data
153
-
154
- def seek(self, offset: i64):
155
- self.pos += offset
156
-
157
- def skip_field(self, wire_type: WireType):
158
- match wire_type:
159
- case WireType.Varint:
160
- self.read_varint()
161
- case WireType.Bits64:
162
- self.seek(8)
163
- case WireType.LengthDelimited:
164
- value = self.read_length_delimited(wire_type)
165
- self.seek(i64(value))
166
- case WireType.Bits32:
167
- self.seek(4)
168
- case _:
169
- raise PbtoolsError("Bad wire type")
3
+ from .pbtools import Decoder
4
+ from .pbtools import Encoder
5
+ from .pbtools import PbtoolsError
6
+ from .pbtools import WireType
170
7
171
8
@enum
172
9
class PersonPhoneType:
@@ -178,16 +15,38 @@ class PersonPhoneNumber:
178
15
number: string
179
16
type: PersonPhoneType
180
17
18
+ def clear(self):
19
+ self.number = ""
20
+ self.type = PersonPhoneType.Mobile
21
+
181
22
def to_bytes_inner(self, encoder: Encoder):
182
23
encoder.write_enum(2, i64(self.type))
183
24
encoder.write(1, self.number)
184
25
26
+ def from_bytes_inner(self, decoder: Decoder):
27
+ while decoder.available():
28
+ field_number, wire_type = decoder.read_tag()
29
+
30
+ match field_number:
31
+ case 1:
32
+ self.number = decoder.read_string(wire_type)
33
+ case 2:
34
+ self.type = PersonPhoneType(i64(decoder.read_i32(wire_type)))
35
+ case _:
36
+ decoder.skip_field(wire_type)
37
+
185
38
class Person:
186
39
name: string
187
40
id: i32
188
41
email: string
189
42
phones: [PersonPhoneNumber]
190
43
44
+ def clear(self):
45
+ self.name = ""
46
+ self.id = 0
47
+ self.email = ""
48
+ self.phones = []
49
+
191
50
def to_bytes_inner(self, encoder: Encoder):
192
51
for phone in reversed(self.phones):
193
52
pos = encoder.pos
@@ -209,20 +68,28 @@ class Person:
209
68
self.id = decoder.read_i32(wire_type)
210
69
case 3:
211
70
self.email = decoder.read_string(wire_type)
71
+ case 4:
72
+ decoder.read_length_delimited(WireType.LengthDelimited)
73
+ phone = PersonPhoneNumber("", PersonPhoneType.Mobile)
74
+ phone.from_bytes_inner(decoder)
75
+ self.phones.append(phone)
212
76
case _:
213
77
decoder.skip_field(wire_type)
214
78
215
79
class AddressBook:
216
80
people: [Person]
217
81
82
+ def clear(self):
83
+ self.people.clear()
84
+
218
85
def to_bytes(self) -> bytes:
219
86
encoder = Encoder()
220
87
self.to_bytes_inner(encoder)
221
88
222
89
return encoder.data()
223
90
224
91
def from_bytes(self, data: bytes):
225
- self.people. clear()
92
+ self.clear()
226
93
self.from_bytes_inner(Decoder(data))
227
94
228
95
def to_bytes_inner(self, encoder: Encoder):
0 commit comments