-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PartitionKey Kafka Interceptor #260
PartitionKey Kafka Interceptor #260
Conversation
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
kafka/src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Looks good to me, but I am not in a position to verify if this works as expected. Unit tests look fine though.
/lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slinkydeveloper - Sorry - but this code doesn't look quite right to me. The Interceptor should be calculating a partition-id based upon its configuration the incoming record. The assertion should be validating against the partition-id instead of the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls note - this implementation looks like it is rekeying the record - and instead it should calculate the partition-id.
private void assertKey(ProducerRecord<Object, CloudEvent> record, Object expectedKey) { | ||
PartitionKeyExtensionInterceptor interceptor = new PartitionKeyExtensionInterceptor(); | ||
assertThat(interceptor.onSend(record).key()) | ||
.isEqualTo(expectedKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should validate against a calculated partition-id and not change the key
return record; | ||
} | ||
|
||
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), partitionKey, record.value(), record.headers()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the second field should be calculated based upon 1) the number of partitions, 2) the extension 3) the input record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's what the spec states: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping
I don't quite get what the implementation should look like to you, can you provide a sample code/pr of how you would implement it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In particular:
Every implementation SHOULD provide a default "Key Mapper" implementation that maps the Partitioning partitionkey attribute value to the 'key' of the Kafka message as-is, if present.
It's named partitionkey, but in fact we're talking about the key of the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's how this is implemented in other sdks too https://github.com/cloudevents/sdk-go/blob/master/protocol/kafka_sarama/v2/write_producer_message.go#L53
Why? Spec: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping
I don't see a reference to the number of partitions in the spec. |
@pierDipi - because this implementation is broken. @slinkydeveloper - We need to understand both the spec and what it is trying to achieve and also map it to the technology. The https://github.com/cloudevents/spec/blob/master/extensions/partitioning.md refers to a partitionKey and not a recordKey (perhaps it is not clear enough) - I agree the kafka mapping is also not sufficiently clear, the partitioning was a late addition after months of debate. If the record is rekeyed then it is broken - you cannot change the id of a trade, user record etc. In the case of Kafka the partitionKey is the id (int) of a partition whereas the spec talks about a string (in the generic sense).
@slinkydeveloper - I can jump on a call to discuss if it would help. |
In that case, you just don't have to set the
@bluemonk3y I think the problem you're underlining it's about the spec and not this implementation, since to me the wording of the spec defines pretty clearly a mapping between If you think the spec is broken, can you open an issue here so we can discuss it during the CloudEvents weekly meeting? https://github.com/cloudevents/spec/issues In any case, feel free to join the CloudEvents weekly meeting anyway and we can chat about it https://github.com/cloudevents/spec#meeting-time |
@gunnarmorling @duglin - see above. I previously raised an issue about the implementation and it was closed with no consultation back to myself, and now I see these guys forging ahead and building something that is broken. The PR should not be merged. |
The pr was merged before your comments came in, and it was here to review for more than 2 days. I'm going to prepare a revert pr and, if we have the agreement the implementation is wrong, we revert it |
This reverts commit db745fd.
@bluemonk3y let's assume you're right and all the reviewers in this PR misinterpreted the spec, how should the partition id be calculated? The spec doesn't state that at all this particular aspect. |
This reverts commit db745fd. Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
Revert PR #273 |
@slinkydeveloper - the purpose of the partition-key (id) is used to comingle records that need to be associated. For example there might be trades booked in different accounts that are all owned by the same counterparty. With Kafka If downstream consumer processing is required to aggregate against all trades by counterparty then the need to be written to the same partition. If the trades are written using default partitioning then the hash of the key is used during partition assignment and they will be spread over the different partitions. To specify the partition id a value needs to generated that can bucket-them together. Hence, when producing the trades the partition id would be: |
I see your point, and this is definitely an interesting solution, although the spec doesn't talk about taking the |
I see it was merged before your comments. Are you talking about another PR that's related or something?
Very sorry to see this comment. I think the PR author, myself and the community would be happy to hear alternative solutions, alternative PRs, design docs, etc. |
Hey @slinkydeveloper, @bluemonk3y et al., the implementation in this PR looks right to me; I also interpret the spec wording in the way that the "partition key" it talks about is to be mapped to the Kafka message key:
This makes sense to me; for @bluemonk3y's trade example, the creator of the event would either have to set |
The creator of the event could even either manually set the record partition id or create its own interceptor to do so (without enabling the one in this PR, which is of course opt-in) |
@slinkydeveloper - yep, I agree. Pls note- from a Kafka perspective - the danger of rewriting keys has consequences:
|
Ok so do we agree there is no need to revert it?
It would be really nice if you bring these topics at the next CloudEvents meeting |
I guess if the user if completely aware that it is rekeying and not performing partition assignment then it's up to you. As a Kafka developer it conflates keying and partitioning which in turn breaks a fundamental understanding of how I would design a dataflow of related streams. |
Ok I'm going to close the revert PR. |
@bluemonk3y Why is it though that you speak about re-keying? AFAIU, it's simply providing the Kafka message key when using this SDK to create a CE. I.e. simply keying, not re-keying. I think from a spec PoV, the term "partition key" was chosen as an abstraction from specific transports like Kafka. |
@bluemonk3y Please next time you have any concern, before pinging other community members, wait for the answer from the people involved in the PR (author and reviewers) and make sure you have a clear understanding of the spec text, and avoid making any unfortunate comments. |
@gunnarmorling - when the record is presented to the interceptor has a key, and when it leaves the interceptor it a different key - hence it was rekeyed. It breaks a Kafka fundamental. Also agree: spec PoV, the term "partition key" which is an abstraction and was taken literally in this implementation. I dont believe the Kafka binding spec is clear enough on the mapping, and probably the reason this PR is halfbaked. @slinkydeveloper - as an FYI I wrote the spec so I'm pretty sure I have a clear understanding of what it means and hence why I think it is not clear enough. Also note - I don't appreciate my previous issue on partitioning being closed without consultation. |
Ok, I see now where you're coming from. Tbh. the overall usage of this SDK in the context of Kafka is still somewhat fuzzy to me; e.g. in the example here, the But then I could simply create the |
This implements a
ProducerInterceptor
that sets up the proper partition key from theCloudEvent
extensionFixes #94
Signed-off-by: Francesco Guardiani francescoguard@gmail.com