Skip to content

Commit de61dd9

Browse files
feat: Kafka Protocol (cloudevents#197)
* Add kafka event and conversions. Signed-off-by: davidwmartines <d5172@yahoo.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove kafka CloudEvent class Signed-off-by: davidwmartines <d5172@yahoo.com> * Update conversion and init Signed-off-by: davidwmartines <d5172@yahoo.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix formatting. Signed-off-by: davidwmartines <d5172@yahoo.com> * Add tests for kafka binary conversion. Signed-off-by: davidwmartines <d5172@yahoo.com> * Catch marshalling errors, raise cloud_exceptions. Signed-off-by: davidwmartines <d5172@yahoo.com> * Add tests for to/from structured. Signed-off-by: davidwmartines <d5172@yahoo.com> * Fix spacing issues. Signed-off-by: davidwmartines <d5172@yahoo.com> * Rename ProtocolMessage to KafkaMessage. Signed-off-by: davidwmartines <d5172@yahoo.com> * Correct type annotations. Signed-off-by: davidwmartines <d5172@yahoo.com> * Use .create function. Signed-off-by: davidwmartines <d5172@yahoo.com> * Simplify failing serdes function. Signed-off-by: davidwmartines <d5172@yahoo.com> * Organize tests into classes. Signed-off-by: davidwmartines <d5172@yahoo.com> * Fix partitionkey attribute name and logic. Signed-off-by: davidwmartines <d5172@yahoo.com> * Add key_mapper option. Signed-off-by: davidwmartines <d5172@yahoo.com> * Refactor tests, raise KeyMapperError Signed-off-by: davidwmartines <d5172@yahoo.com> * Add copyright.x Signed-off-by: davidwmartines <d5172@yahoo.com> * Remove optional typing. Signed-off-by: davidwmartines <d5172@yahoo.com> Signed-off-by: davidwmartines <d5172@yahoo.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 6648eb5 commit de61dd9

File tree

4 files changed

+825
-0
lines changed

4 files changed

+825
-0
lines changed

cloudevents/kafka/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from cloudevents.kafka.conversion import (
16+
KafkaMessage,
17+
KeyMapper,
18+
from_binary,
19+
from_structured,
20+
to_binary,
21+
to_structured,
22+
)
23+
24+
__all__ = [
25+
KafkaMessage,
26+
KeyMapper,
27+
from_binary,
28+
from_structured,
29+
to_binary,
30+
to_structured,
31+
]

cloudevents/kafka/conversion.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
import base64
15+
import json
16+
import typing
17+
18+
from cloudevents import exceptions as cloud_exceptions
19+
from cloudevents import http
20+
from cloudevents.abstract import AnyCloudEvent
21+
from cloudevents.kafka.exceptions import KeyMapperError
22+
from cloudevents.sdk import types
23+
24+
DEFAULT_MARSHALLER: types.MarshallerType = json.dumps
25+
DEFAULT_UNMARSHALLER: types.MarshallerType = json.loads
26+
DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = lambda x: x
27+
28+
29+
class KafkaMessage(typing.NamedTuple):
30+
"""
31+
Represents the elements of a message sent or received through the Kafka protocol.
32+
Callers can map their client-specific message representation to and from this
33+
type in order to use the cloudevents.kafka conversion functions.
34+
"""
35+
36+
headers: typing.Dict[str, bytes]
37+
"""
38+
The dictionary of message headers key/values.
39+
"""
40+
41+
key: typing.Optional[typing.Union[bytes, str]]
42+
"""
43+
The message key.
44+
"""
45+
46+
value: typing.Union[bytes, str]
47+
"""
48+
The message value.
49+
"""
50+
51+
52+
KeyMapper = typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]
53+
"""
54+
A callable function that creates a Kafka message key, given a CloudEvent instance.
55+
"""
56+
57+
DEFAULT_KEY_MAPPER: KeyMapper = lambda event: event.get("partitionkey")
58+
"""
59+
The default KeyMapper which maps the user provided `partitionkey` attribute value
60+
to the `key` of the Kafka message as-is, if present.
61+
"""
62+
63+
64+
def to_binary(
65+
event: AnyCloudEvent,
66+
data_marshaller: typing.Optional[types.MarshallerType] = None,
67+
key_mapper: typing.Optional[KeyMapper] = None,
68+
) -> KafkaMessage:
69+
"""
70+
Returns a KafkaMessage in binary format representing this Cloud Event.
71+
72+
:param event: The event to be converted. To specify the Kafka messaage Key, set
73+
the `partitionkey` attribute of the event, or provide a KeyMapper.
74+
:param data_marshaller: Callable function to cast event.data into
75+
either a string or bytes.
76+
:param key_mapper: Callable function to get the Kafka message key.
77+
:returns: KafkaMessage
78+
"""
79+
data_marshaller = data_marshaller or DEFAULT_MARSHALLER
80+
key_mapper = key_mapper or DEFAULT_KEY_MAPPER
81+
82+
try:
83+
message_key = key_mapper(event)
84+
except Exception as e:
85+
raise KeyMapperError(
86+
f"Failed to map message key with error: {type(e).__name__}('{e}')"
87+
)
88+
89+
headers = {}
90+
if event["content-type"]:
91+
headers["content-type"] = event["content-type"].encode("utf-8")
92+
for attr, value in event.get_attributes().items():
93+
if attr not in ["data", "partitionkey", "content-type"]:
94+
if value is not None:
95+
headers["ce_{0}".format(attr)] = value.encode("utf-8")
96+
97+
try:
98+
data = data_marshaller(event.data)
99+
except Exception as e:
100+
raise cloud_exceptions.DataMarshallerError(
101+
f"Failed to marshall data with error: {type(e).__name__}('{e}')"
102+
)
103+
if isinstance(data, str):
104+
data = data.encode("utf-8")
105+
106+
return KafkaMessage(headers, message_key, data)
107+
108+
109+
def from_binary(
110+
message: KafkaMessage,
111+
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
112+
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
113+
) -> AnyCloudEvent:
114+
"""
115+
Returns a CloudEvent from a KafkaMessage in binary format.
116+
117+
:param message: The KafkaMessage to be converted.
118+
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
119+
:param data_unmarshaller: Callable function to map data to a python object
120+
:returns: CloudEvent
121+
"""
122+
123+
data_unmarshaller = data_unmarshaller or DEFAULT_UNMARSHALLER
124+
event_type = event_type or http.CloudEvent
125+
126+
attributes = {}
127+
128+
for header, value in message.headers.items():
129+
header = header.lower()
130+
if header == "content-type":
131+
attributes["content-type"] = value.decode()
132+
elif header.startswith("ce_"):
133+
attributes[header[3:]] = value.decode()
134+
135+
if message.key is not None:
136+
attributes["partitionkey"] = message.key
137+
138+
try:
139+
data = data_unmarshaller(message.value)
140+
except Exception as e:
141+
raise cloud_exceptions.DataUnmarshallerError(
142+
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
143+
)
144+
145+
return event_type.create(attributes, data)
146+
147+
148+
def to_structured(
149+
event: AnyCloudEvent,
150+
data_marshaller: typing.Optional[types.MarshallerType] = None,
151+
envelope_marshaller: typing.Optional[types.MarshallerType] = None,
152+
key_mapper: typing.Optional[KeyMapper] = None,
153+
) -> KafkaMessage:
154+
"""
155+
Returns a KafkaMessage in structured format representing this Cloud Event.
156+
157+
:param event: The event to be converted. To specify the Kafka message KEY, set
158+
the `partitionkey` attribute of the event.
159+
:param data_marshaller: Callable function to cast event.data into
160+
either a string or bytes.
161+
:param envelope_marshaller: Callable function to cast event envelope into
162+
either a string or bytes.
163+
:param key_mapper: Callable function to get the Kafka message key.
164+
:returns: KafkaMessage
165+
"""
166+
data_marshaller = data_marshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
167+
envelope_marshaller = envelope_marshaller or DEFAULT_MARSHALLER
168+
key_mapper = key_mapper or DEFAULT_KEY_MAPPER
169+
170+
try:
171+
message_key = key_mapper(event)
172+
except Exception as e:
173+
raise KeyMapperError(
174+
f"Failed to map message key with error: {type(e).__name__}('{e}')"
175+
)
176+
177+
attrs = event.get_attributes().copy()
178+
179+
try:
180+
data = data_marshaller(event.data)
181+
except Exception as e:
182+
raise cloud_exceptions.DataMarshallerError(
183+
f"Failed to marshall data with error: {type(e).__name__}('{e}')"
184+
)
185+
if isinstance(data, (bytes, bytes, memoryview)):
186+
attrs["data_base64"] = base64.b64encode(data).decode("ascii")
187+
else:
188+
attrs["data"] = data
189+
190+
headers = {}
191+
if "content-type" in attrs:
192+
headers["content-type"] = attrs.pop("content-type").encode("utf-8")
193+
194+
try:
195+
value = envelope_marshaller(attrs)
196+
except Exception as e:
197+
raise cloud_exceptions.DataMarshallerError(
198+
f"Failed to marshall event with error: {type(e).__name__}('{e}')"
199+
)
200+
201+
if isinstance(value, str):
202+
value = value.encode("utf-8")
203+
204+
return KafkaMessage(headers, message_key, value)
205+
206+
207+
def from_structured(
208+
message: KafkaMessage,
209+
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
210+
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
211+
envelope_unmarshaller: typing.Optional[types.MarshallerType] = None,
212+
) -> AnyCloudEvent:
213+
"""
214+
Returns a CloudEvent from a KafkaMessage in structured format.
215+
216+
:param message: The KafkaMessage to be converted.
217+
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
218+
:param data_unmarshaller: Callable function to map the data to a python object.
219+
:param envelope_unmarshaller: Callable function to map the envelope to a python
220+
object.
221+
:returns: CloudEvent
222+
"""
223+
224+
data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
225+
envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
226+
event_type = event_type or http.CloudEvent
227+
228+
try:
229+
structure = envelope_unmarshaller(message.value)
230+
except Exception as e:
231+
raise cloud_exceptions.DataUnmarshallerError(
232+
"Failed to unmarshall message with error: " f"{type(e).__name__}('{e}')"
233+
)
234+
235+
attributes = {}
236+
if message.key is not None:
237+
attributes["partitionkey"] = message.key
238+
239+
for name, value in structure.items():
240+
decoder = lambda x: x
241+
if name == "data":
242+
decoder = lambda v: data_unmarshaller(v)
243+
if name == "data_base64":
244+
decoder = lambda v: data_unmarshaller(base64.b64decode(v))
245+
name = "data"
246+
247+
try:
248+
decoded_value = decoder(value)
249+
except Exception as e:
250+
raise cloud_exceptions.DataUnmarshallerError(
251+
"Failed to unmarshall data with error: " f"{type(e).__name__}('{e}')"
252+
)
253+
if name == "data":
254+
data = decoded_value
255+
else:
256+
attributes[name] = decoded_value
257+
258+
for header, val in message.headers.items():
259+
attributes[header.lower()] = val.decode()
260+
261+
return event_type.create(attributes, data)

cloudevents/kafka/exceptions.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
from cloudevents import exceptions as cloud_exceptions
15+
16+
17+
class KeyMapperError(cloud_exceptions.GenericException):
18+
"""
19+
Raised when a KeyMapper fails.
20+
"""

0 commit comments

Comments
 (0)