Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kafka Protocol #197

Merged
merged 22 commits into from
Nov 17, 2022
Merged

Conversation

davidwmartines
Copy link
Contributor

@davidwmartines davidwmartines commented Nov 9, 2022

Signed-off-by: davidwmartines d5172@yahoo.com

Fixes #196

Changes

This PR adds functions for converting CloudEvents to Kakfa-protocol-ready objects, and for converting Kafka-protocol messages into CloudEvents, per the CloudEvents Kafka protocol binding spec.

  • Supports binary or structured mode conversions
  • Clients can supply appropriate serialization and deserializer functions as MarshallerType callables.

Add Kafka Protocol support

  • Tests pass
  • Appropriate changes to README are included in PR

davidwmartines and others added 2 commits November 8, 2022 23:45
Signed-off-by: davidwmartines <d5172@yahoo.com>
@davidwmartines davidwmartines changed the title Add kafka event and conversions. Kafka Protocol Nov 9, 2022
@davidwmartines

This comment was marked as resolved.

davidwmartines and others added 9 commits November 9, 2022 09:48
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
…s/sdk-python into add-kafka-protocol

Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
@davidwmartines davidwmartines changed the title Kafka Protocol feat: Kafka Protocol Nov 10, 2022
@davidwmartines davidwmartines marked this pull request as ready for review November 10, 2022 23:12
@xSAVIKx xSAVIKx self-requested a review November 11, 2022 08:35
Copy link
Member

@xSAVIKx xSAVIKx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidwmartines thank you for your PR! This looks really good to me.

Can you please take a look at the comments posted below? I believe there is a bit of a misunderstanding between the spec and the implementation (or I may have misunderstood the spec). It'd be great to align the implementation with the spec or raise some questions regarding how the spec should translate into implementation. As far as I see, all the other implementations use partitionkey over key in the CloudEvent attributes (https://github.com/search?q=org%3Acloudevents+partitionkey&type=code)

Returns a CloudEvent from a Kafka ProtocolMessage in binary format.

:param message: The ProtocolMessage to be converted.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.
:param event_type: The type of CloudEvent to create. Defaults to http.CloudEvent.

Comment on lines 33 to 35
headers: typing.Dict[str, bytes]
key: typing.Union[bytes, str]
value: typing.Union[bytes, str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add missing docs?

DEFAULT_EMBEDDED_DATA_MARSHALLER = lambda x: x


class ProtocolMessage(typing.NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure from where this name comes. Is it some standard conventional name for Kafka?

I've glanced at the spec, looks like in Kafka they have either a Message or a Record. Maybe rename this to KafkaMessage instead?



def to_binary(
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None
event: AnyCloudEvent, data_marshaller: types.Optional[types.MarshallerType] = None

if isinstance(data, str):
data = data.encode("utf-8")

return ProtocolMessage(headers, event.get("key"), data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as stated in the Kafka spec, we should provide a way for a user to pass a mapper function for getting the key.

I'd very much like to have that option and have a default that's gonna just use .get("key"). Also, while the key is a required parameter, maybe we can move this key mapping functionality to the very top of the method and fail fast with an error if it is not possible to get a key?

"""

headers: typing.Dict[str, bytes]
key: typing.Union[bytes, str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see from the spec, the name of this attribute should be partitionkey in the CloudEvent.

Every implementation SHOULD provide an opt-in "Key Mapper" implementation that maps the [Partitioning](https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/partitioning.md) partitionkey attribute value to the 'key' of the Kafka message as-is, if present.


@pytest.fixture
def bad_marshaller() -> types.MarshallerType:
return _throw
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just raise an error here?

cloudevents/tests/test_kafka_conversions.py Show resolved Hide resolved
Comment on lines 188 to 190
def test_throw_raises():
with pytest.raises(Exception):
_throw()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's definitely should not be here. Why write a test for a method from a test?

Comment on lines 384 to 393
def test_to_structured_none_key(source_event):
source_event["key"] = None
result = to_structured(source_event)
assert result.key is None


def test_to_structured_no_key(source_event):
del source_event["key"]
result = to_structured(source_event)
assert result.key is None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these tests should actually assert that an error is raised when no partitionkey is available in the event or the mapping function fails to do it's job.

@davidwmartines
Copy link
Contributor Author

@xSAVIKx Thanks for the great feedback. I will get to work on this!

I agree on all points, except for requiring the key. Kafka does allow for null message keys, so I think we can allow a key of None. Let me know if you think this needs more discussion. I will try to clarify this when I add docs to the KafkaMessage class.

Good call on the key mapper function - I was wondering about that. I will add the option to supply a key-mapper as well.

Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
@davidwmartines
Copy link
Contributor Author

working on key_mapper next...

Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
Copy link
Member

@xSAVIKx xSAVIKx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidwmartines overall LGTM! thanks again for you work. Can you please address these 2 minor comments before we merge the PR?

"""


KeyMapper = typing.Optional[typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KeyMapper = typing.Optional[typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]]
KeyMapper = typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]]

KeyMapper itself is not an optional function. But it is not required, so IMO Optional belongs to the function declaration, not the type alias.

@@ -0,0 +1,7 @@
from cloudevents import exceptions as cloud_exceptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a copyright header to this file?

@xSAVIKx xSAVIKx added the enhancement New feature or request label Nov 14, 2022
Signed-off-by: davidwmartines <d5172@yahoo.com>
Signed-off-by: davidwmartines <d5172@yahoo.com>
@davidwmartines
Copy link
Contributor Author

@davidwmartines overall LGTM! thanks again for you work. Can you please address these 2 minor comments before we merge the PR?

Thanks!

@xSAVIKx xSAVIKx merged commit de61dd9 into cloudevents:main Nov 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka protocol support
2 participants