-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Kafka Protocol #197
feat: Kafka Protocol #197
Conversation
Signed-off-by: davidwmartines <d5172@yahoo.com>
for more information, see https://pre-commit.ci
This comment was marked as resolved.
This comment was marked as resolved.
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
…s/sdk-python into add-kafka-protocol Signed-off-by: davidwmartines <d5172@yahoo.com>
for more information, see https://pre-commit.ci
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
…s/sdk-python into add-kafka-protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidwmartines thank you for your PR! This looks really good to me.
Can you please take a look at the comments posted below? I believe there is a bit of a misunderstanding between the spec and the implementation (or I may have misunderstood the spec). It'd be great to align the implementation with the spec or raise some questions regarding how the spec should translate into implementation. As far as I see, all the other implementations use partitionkey
over key
in the CloudEvent attributes (https://github.com/search?q=org%3Acloudevents+partitionkey&type=code)
cloudevents/kafka/conversion.py
Outdated
Returns a CloudEvent from a Kafka ProtocolMessage in binary format. | ||
|
||
:param message: The ProtocolMessage to be converted. | ||
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent. | |
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent. |
cloudevents/kafka/conversion.py
Outdated
headers: typing.Dict[str, bytes] | ||
key: typing.Union[bytes, str] | ||
value: typing.Union[bytes, str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add missing docs?
cloudevents/kafka/conversion.py
Outdated
DEFAULT_EMBEDDED_DATA_MARSHALLER = lambda x: x | ||
|
||
|
||
class ProtocolMessage(typing.NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure from where this name comes. Is it some standard conventional name for Kafka?
I've glanced at the spec, looks like in Kafka they have either a Message
or a Record
. Maybe rename this to KafkaMessage
instead?
cloudevents/kafka/conversion.py
Outdated
|
||
|
||
def to_binary( | ||
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None | |
event: AnyCloudEvent, data_marshaller: types.Optional[types.MarshallerType] = None |
cloudevents/kafka/conversion.py
Outdated
if isinstance(data, str): | ||
data = data.encode("utf-8") | ||
|
||
return ProtocolMessage(headers, event.get("key"), data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So as stated in the Kafka spec, we should provide a way for a user to pass a mapper function for getting the key
.
I'd very much like to have that option and have a default that's gonna just use .get("key")
. Also, while the key
is a required parameter, maybe we can move this key mapping functionality to the very top of the method and fail fast with an error if it is not possible to get a key?
cloudevents/kafka/conversion.py
Outdated
""" | ||
|
||
headers: typing.Dict[str, bytes] | ||
key: typing.Union[bytes, str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I see from the spec, the name of this attribute should be partitionkey
in the CloudEvent.
Every implementation SHOULD provide an opt-in "Key Mapper" implementation that maps the [Partitioning](https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/partitioning.md) partitionkey attribute value to the 'key' of the Kafka message as-is, if present.
|
||
@pytest.fixture | ||
def bad_marshaller() -> types.MarshallerType: | ||
return _throw |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just raise an error here?
def test_throw_raises(): | ||
with pytest.raises(Exception): | ||
_throw() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's definitely should not be here. Why write a test for a method from a test?
def test_to_structured_none_key(source_event): | ||
source_event["key"] = None | ||
result = to_structured(source_event) | ||
assert result.key is None | ||
|
||
|
||
def test_to_structured_no_key(source_event): | ||
del source_event["key"] | ||
result = to_structured(source_event) | ||
assert result.key is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these tests should actually assert that an error is raised when no partitionkey
is available in the event or the mapping function fails to do it's job.
@xSAVIKx Thanks for the great feedback. I will get to work on this! I agree on all points, except for requiring the key. Kafka does allow for null message keys, so I think we can allow a key of Good call on the key mapper function - I was wondering about that. I will add the option to supply a key-mapper as well. |
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
working on key_mapper next... |
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidwmartines overall LGTM! thanks again for you work. Can you please address these 2 minor comments before we merge the PR?
cloudevents/kafka/conversion.py
Outdated
""" | ||
|
||
|
||
KeyMapper = typing.Optional[typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KeyMapper = typing.Optional[typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]] | |
KeyMapper = typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]] |
KeyMapper itself is not an optional function. But it is not required, so IMO Optional
belongs to the function declaration, not the type alias.
@@ -0,0 +1,7 @@ | |||
from cloudevents import exceptions as cloud_exceptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a copyright header to this file?
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Thanks! |
Signed-off-by: davidwmartines d5172@yahoo.com
Fixes #196
Changes
This PR adds functions for converting CloudEvents to Kakfa-protocol-ready objects, and for converting Kafka-protocol messages into CloudEvents, per the CloudEvents Kafka protocol binding spec.
Add Kafka Protocol support