Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kafka Protocol #197

Merged
merged 22 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c4e726b
Add kafka event and conversions.
davidwmartines Nov 9, 2022
ca8d877
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 9, 2022
086b135
Remove kafka CloudEvent class
davidwmartines Nov 9, 2022
ef71c6e
Update conversion and init
davidwmartines Nov 9, 2022
070f1ab
Merge branch 'add-kafka-protocol' of https://github.com/davidwmartine…
davidwmartines Nov 9, 2022
72fb800
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 9, 2022
714c159
Fix formatting.
davidwmartines Nov 10, 2022
4bd1405
Add tests for kafka binary conversion.
davidwmartines Nov 10, 2022
1af9b22
Catch marshalling errors, raise cloud_exceptions.
davidwmartines Nov 10, 2022
cb6c0a2
Add tests for to/from structured.
davidwmartines Nov 10, 2022
6d63e47
Merge branch 'add-kafka-protocol' of https://github.com/davidwmartine…
davidwmartines Nov 10, 2022
0d58b97
Fix spacing issues.
davidwmartines Nov 11, 2022
5d84392
Rename ProtocolMessage to KafkaMessage.
davidwmartines Nov 11, 2022
dd21f16
Correct type annotations.
davidwmartines Nov 11, 2022
6b7c373
Use .create function.
davidwmartines Nov 11, 2022
b1cee7e
Simplify failing serdes function.
davidwmartines Nov 11, 2022
bfd17cc
Organize tests into classes.
davidwmartines Nov 11, 2022
1a6cecb
Fix partitionkey attribute name and logic.
davidwmartines Nov 11, 2022
eea5ae1
Add key_mapper option.
davidwmartines Nov 13, 2022
acebd72
Refactor tests, raise KeyMapperError
davidwmartines Nov 13, 2022
d5921a9
Add copyright.x
davidwmartines Nov 14, 2022
31e8fbc
Remove optional typing.
davidwmartines Nov 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cloudevents/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from cloudevents.kafka.conversion import (
ProtocolMessage,
from_binary,
from_structured,
to_binary,
to_structured,
)

__all__ = [
to_binary,
from_binary,
to_structured,
from_structured,
ProtocolMessage,
]
214 changes: 214 additions & 0 deletions cloudevents/kafka/conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import json
import typing

from cloudevents import exceptions as cloud_exceptions
from cloudevents import http
from cloudevents.abstract import AnyCloudEvent
from cloudevents.sdk import types

DEFAULT_MARSHALLER = json.dumps
DEFAULT_UNMARSHALLER = json.loads
DEFAULT_EMBEDDED_DATA_MARSHALLER = lambda x: x


class ProtocolMessage(typing.NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure from where this name comes. Is it some standard conventional name for Kafka?

I've glanced at the spec, looks like in Kafka they have either a Message or a Record. Maybe rename this to KafkaMessage instead?

"""
A raw kafka-protocol message.
"""

headers: typing.Dict[str, bytes]
key: typing.Union[bytes, str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see from the spec, the name of this attribute should be partitionkey in the CloudEvent.

Every implementation SHOULD provide an opt-in "Key Mapper" implementation that maps the [Partitioning](https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/partitioning.md) partitionkey attribute value to the 'key' of the Kafka message as-is, if present.

value: typing.Union[bytes, str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add missing docs?



def to_binary(
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None
event: AnyCloudEvent, data_marshaller: types.Optional[types.MarshallerType] = None

) -> ProtocolMessage:
"""
Returns a Kafka ProtocolMessage in binary format representing this Cloud Event.

:param event: The event to be converted.
:param data_marshaller: Callable function to cast event.data into
either a string or bytes.
:returns: ProtocolMessage
"""
data_marshaller = data_marshaller or DEFAULT_MARSHALLER
headers = {}
if event["content-type"]:
headers["content-type"] = event["content-type"].encode("utf-8")
for attr, value in event.get_attributes().items():
if attr not in ["data", "key", "content-type"]:
if value is not None:
headers["ce_{0}".format(attr)] = value.encode("utf-8")

try:
data = data_marshaller(event.data)
except Exception as e:
raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall data with error: {type(e).__name__}('{e}')"
)
if isinstance(data, str):
data = data.encode("utf-8")

return ProtocolMessage(headers, event.get("key"), data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as stated in the Kafka spec, we should provide a way for a user to pass a mapper function for getting the key.

I'd very much like to have that option and have a default that's gonna just use .get("key"). Also, while the key is a required parameter, maybe we can move this key mapping functionality to the very top of the method and fail fast with an error if it is not possible to get a key?



def from_binary(
message: ProtocolMessage,
event_type: typing.Type[AnyCloudEvent] = None,
data_unmarshaller: types.MarshallerType = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_type: typing.Type[AnyCloudEvent] = None,
data_unmarshaller: types.MarshallerType = None,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,

) -> AnyCloudEvent:
"""
Returns a CloudEvent from a Kafka ProtocolMessage in binary format.

:param message: The ProtocolMessage to be converted.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.

:param data_unmarshaller: Callable function to map data to a python object
:returns: CloudEvent
"""

data_unmarshaller = data_unmarshaller or DEFAULT_UNMARSHALLER
event_type = event_type or http.CloudEvent

attributes = {}

for header, value in message.headers.items():
header = header.lower()
if header == "content-type":
attributes["content-type"] = value.decode()
elif header.startswith("ce_"):
attributes[header[3:]] = value.decode()

if message.key is not None:
xSAVIKx marked this conversation as resolved.
Show resolved Hide resolved
attributes["key"] = message.key

try:
data = data_unmarshaller(message.value)
except Exception as e:
raise cloud_exceptions.DataUnmarshallerError(
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
)

return event_type(attributes, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should be using event_type.create() method instead of constructor.



def to_structured(
event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None,
envelope_marshaller: types.MarshallerType = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_marshaller: types.MarshallerType = None,
envelope_marshaller: types.MarshallerType = None,
data_marshaller: typing.Optional[types.MarshallerType] = None,
envelope_marshaller: typing.Optional[types.MarshallerType] = None,

) -> ProtocolMessage:
"""
Returns a Kafka ProtocolMessage in structured format representing this Cloud Event.

:param event: The event to be converted.
:param data_marshaller: Callable function to cast event.data into
either a string or bytes.
:param envelope_marshaller: Callable function to cast event envelope into
either a string or bytes.
:returns: ProtocolMessage
"""
data_marshaller = data_marshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
envelope_marshaller = envelope_marshaller or DEFAULT_MARSHALLER

attrs = event.get_attributes().copy()

try:
data = data_marshaller(event.data)
except Exception as e:
raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall data with error: {type(e).__name__}('{e}')"
)
if isinstance(data, (bytes, bytes, memoryview)):
attrs["data_base64"] = base64.b64encode(data).decode("ascii")
else:
attrs["data"] = data

headers = {}
if "content-type" in attrs:
headers["content-type"] = attrs.pop("content-type").encode("utf-8")

if "key" in attrs:
key = attrs.pop("key")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per the spec, we must not modify the cloudevent itself, meaning that we must not .pop() the key here, but rather just .get() it.

else:
key = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. key is a required property. We shouldn't allow events without a proper key.

It also would be great to have that key mapping functionality I mentioned above here.


try:
value = envelope_marshaller(attrs)
except Exception as e:
raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall event with error: {type(e).__name__}('{e}')"
)

if isinstance(value, str):
value = value.encode("utf-8")

return ProtocolMessage(headers, key, value)


def from_structured(
message: ProtocolMessage,
event_type: typing.Type[AnyCloudEvent] = None,
data_unmarshaller: types.MarshallerType = None,
envelope_unmarshaller: types.MarshallerType = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_type: typing.Type[AnyCloudEvent] = None,
data_unmarshaller: types.MarshallerType = None,
envelope_unmarshaller: types.MarshallerType = None,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.MarshallerType] = None,

) -> AnyCloudEvent:
"""
Returns a CloudEvent from a Kafka ProtocolMessage in structured format.

:param message: The ProtocolMessage to be converted.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.

:param data_unmarshaller: Callable function to map the data to a python object.
:param envelope_unmarshaller: Callable functionto map the envelope to a python
object.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param envelope_unmarshaller: Callable functionto map the envelope to a python
object.
:param envelope_unmarshaller: Callable function to map the envelope to a python
object.

:returns: CloudEvent
"""

data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
event_type = event_type or http.CloudEvent

try:
structure = envelope_unmarshaller(message.value)
except Exception as e:
raise cloud_exceptions.DataUnmarshallerError(
"Failed to unmarshall message with error: " f"{type(e).__name__}('{e}')"
)

attributes = {"key": message.key}

for name, value in structure.items():
decoder = lambda x: x
if name == "data":
decoder = lambda v: data_unmarshaller(v)
if name == "data_base64":
decoder = lambda v: data_unmarshaller(base64.b64decode(v))
name = "data"

try:
decoded_value = decoder(value)
except Exception as e:
raise cloud_exceptions.DataUnmarshallerError(
"Failed to unmarshall data with error: " f"{type(e).__name__}('{e}')"
)
if name == "data":
data = decoded_value
else:
attributes[name] = decoded_value

for header, val in message.headers.items():
attributes[header.lower()] = val.decode()

return event_type(attributes, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use event_type.create()

Loading