Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionKey Kafka Interceptor #260

Merged
merged 2 commits into from
Nov 13, 2020

Conversation

slinkydeveloper
Copy link
Member

@slinkydeveloper slinkydeveloper commented Nov 11, 2020

This implements a ProducerInterceptor that sets up the proper partition key from the CloudEvent extension

Fixes #94

Signed-off-by: Francesco Guardiani francescoguard@gmail.com

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
@slinkydeveloper slinkydeveloper added the enhancement New feature or request label Nov 11, 2020
@slinkydeveloper slinkydeveloper added this to the 2.0.0.CR1 milestone Nov 11, 2020
Copy link

@aliok aliok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Looks good to me, but I am not in a position to verify if this works as expected. Unit tests look fine though.

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
@aliok
Copy link

aliok commented Nov 11, 2020

/lgtm

Copy link

@bluemonk3y bluemonk3y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slinkydeveloper - Sorry - but this code doesn't look quite right to me. The Interceptor should be calculating a partition-id based upon its configuration the incoming record. The assertion should be validating against the partition-id instead of the key.

Copy link

@bluemonk3y bluemonk3y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls note - this implementation looks like it is rekeying the record - and instead it should calculate the partition-id.

private void assertKey(ProducerRecord<Object, CloudEvent> record, Object expectedKey) {
PartitionKeyExtensionInterceptor interceptor = new PartitionKeyExtensionInterceptor();
assertThat(interceptor.onSend(record).key())
.isEqualTo(expectedKey);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should validate against a calculated partition-id and not change the key

return record;
}

return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), partitionKey, record.value(), record.headers());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second field should be calculated based upon 1) the number of partitions, 2) the extension 3) the input record

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's what the spec states: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping

I don't quite get what the implementation should look like to you, can you provide a sample code/pr of how you would implement it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular:

Every implementation SHOULD provide a default "Key Mapper" implementation that maps the Partitioning partitionkey attribute value to the 'key' of the Kafka message as-is, if present.

It's named partitionkey, but in fact we're talking about the key of the message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierDipi
Copy link
Member

pierDipi commented Nov 13, 2020

instead it should calculate the partition-id

Why?

Spec: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping

Every implementation SHOULD provide a default "Key Mapper" implementation that maps the Partitioning partitionkey attribute value to the 'key' of the Kafka message as-is, if present.


the second field should be calculated based upon 1) the number of partitions, 2) the extension 3) the input record

I don't see a reference to the number of partitions in the spec.

@bluemonk3y
Copy link

@pierDipi - because this implementation is broken.

@slinkydeveloper - We need to understand both the spec and what it is trying to achieve and also map it to the technology. The https://github.com/cloudevents/spec/blob/master/extensions/partitioning.md refers to a partitionKey and not a recordKey (perhaps it is not clear enough) - I agree the kafka mapping is also not sufficiently clear, the partitioning was a late addition after months of debate.

If the record is rekeyed then it is broken - you cannot change the id of a trade, user record etc.

In the case of Kafka the partitionKey is the id (int) of a partition whereas the spec talks about a string (in the generic sense).
Below is a snippet from Kafka streams code producing a record using serde and also keying the partition (3rd arg).

Produced.with(
                                keySerde,
                                valueEntrySerde,
                                (topic, key, value, numPartitions) ->
                                        (int) (value.data().someRecordAttributeConvertedToNumber() % numPartitions)));

@slinkydeveloper - I can jump on a call to discuss if it would help.

@slinkydeveloper
Copy link
Member Author

slinkydeveloper commented Nov 13, 2020

If the record is rekeyed then it is broken - you cannot change the id of a trade, user record etc.

In that case, you just don't have to set the partitionkey extension i guess.

because this implementation is broken.

@bluemonk3y I think the problem you're underlining it's about the spec and not this implementation, since to me the wording of the spec defines pretty clearly a mapping between partitionkey and record key, it doesn't leave much space to misinterpretations. This was even discussed and made more clear recently cloudevents/spec#571 cloudevents/spec#599.

If you think the spec is broken, can you open an issue here so we can discuss it during the CloudEvents weekly meeting? https://github.com/cloudevents/spec/issues

In any case, feel free to join the CloudEvents weekly meeting anyway and we can chat about it https://github.com/cloudevents/spec#meeting-time

@bluemonk3y
Copy link

@gunnarmorling @duglin - see above. I previously raised an issue about the implementation and it was closed with no consultation back to myself, and now I see these guys forging ahead and building something that is broken.

The PR should not be merged.

@slinkydeveloper
Copy link
Member Author

slinkydeveloper commented Nov 13, 2020

The pr was merged before your comments came in, and it was here to review for more than 2 days. I'm going to prepare a revert pr and, if we have the agreement the implementation is wrong, we revert it

@slinkydeveloper slinkydeveloper restored the issues/94 branch November 13, 2020 10:00
slinkydeveloper added a commit that referenced this pull request Nov 13, 2020
@slinkydeveloper
Copy link
Member Author

@bluemonk3y let's assume you're right and all the reviewers in this PR misinterpreted the spec, how should the partition id be calculated? The spec doesn't state that at all this particular aspect.

slinkydeveloper added a commit to slinkydeveloper/sdk-java that referenced this pull request Nov 13, 2020
This reverts commit db745fd.

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
@slinkydeveloper
Copy link
Member Author

Revert PR #273

@bluemonk3y
Copy link

bluemonk3y commented Nov 13, 2020

@slinkydeveloper - the purpose of the partition-key (id) is used to comingle records that need to be associated. For example there might be trades booked in different accounts that are all owned by the same counterparty. With Kafka If downstream consumer processing is required to aggregate against all trades by counterparty then the need to be written to the same partition. If the trades are written using default partitioning then the hash of the key is used during partition assignment and they will be spread over the different partitions. To specify the partition id a value needs to generated that can bucket-them together. Hence, when producing the trades the partition id would be: trade.account.partyId % partitionCount. This is the function that the interceptor will provide. HtH

@slinkydeveloper
Copy link
Member Author

I see your point, and this is definitely an interesting solution, although the spec doesn't talk about taking the partitionKey, hashing it and doing the modulo with partition count.

@aliok
Copy link

aliok commented Nov 13, 2020

I previously raised an issue about the implementation and it was closed with no consultation back to myself

I see it was merged before your comments. Are you talking about another PR that's related or something?

now I see these guys forging ahead and building something that is broken.

Very sorry to see this comment.

I think the PR author, myself and the community would be happy to hear alternative solutions, alternative PRs, design docs, etc.

@gunnarmorling
Copy link
Contributor

Hey @slinkydeveloper, @bluemonk3y et al., the implementation in this PR looks right to me; I also interpret the spec wording in the way that the "partition key" it talks about is to be mapped to the Kafka message key:

Every implementation SHOULD provide a default "Key Mapper" implementation that maps the Partitioning partitionkey attribute value to the 'key' of the Kafka message as-is, if present.

This makes sense to me; for @bluemonk3y's trade example, the creator of the event would either have to set partitionkey to the value of trade.account.partyId; or if trades must keep their keys, they'd use a custom partitioner implemented as per your suggestion. I don't think the event creator should by default have to concern themselves with details of the Kafka topic, like number of partitions.

@slinkydeveloper
Copy link
Member Author

slinkydeveloper commented Nov 13, 2020

The creator of the event could even either manually set the record partition id or create its own interceptor to do so (without enabling the one in this PR, which is of course opt-in)

@bluemonk3y
Copy link

@slinkydeveloper - yep, I agree.

Pls note- from a Kafka perspective - the danger of rewriting keys has consequences:

  • it would be difficult to find or identify rekeyed records
  • compacted topics will likely break
  • if the algorithm changes then you cant associate previous written records

@slinkydeveloper slinkydeveloper deleted the issues/94 branch November 13, 2020 10:50
@slinkydeveloper
Copy link
Member Author

@slinkydeveloper - yep, I agree.

Ok so do we agree there is no need to revert it?

the danger of rewriting keys has consequences:

It would be really nice if you bring these topics at the next CloudEvents meeting

@bluemonk3y
Copy link

I guess if the user if completely aware that it is rekeying and not performing partition assignment then it's up to you.

As a Kafka developer it conflates keying and partitioning which in turn breaks a fundamental understanding of how I would design a dataflow of related streams.

@slinkydeveloper
Copy link
Member Author

Ok I'm going to close the revert PR.

@gunnarmorling
Copy link
Contributor

@bluemonk3y Why is it though that you speak about re-keying? AFAIU, it's simply providing the Kafka message key when using this SDK to create a CE. I.e. simply keying, not re-keying.

I think from a spec PoV, the term "partition key" was chosen as an abstraction from specific transports like Kafka.

@slinkydeveloper
Copy link
Member Author

slinkydeveloper commented Nov 13, 2020

@bluemonk3y Please next time you have any concern, before pinging other community members, wait for the answer from the people involved in the PR (author and reviewers) and make sure you have a clear understanding of the spec text, and avoid making any unfortunate comments.

@bluemonk3y
Copy link

@gunnarmorling - when the record is presented to the interceptor has a key, and when it leaves the interceptor it a different key - hence it was rekeyed. It breaks a Kafka fundamental.

Also agree: spec PoV, the term "partition key" which is an abstraction and was taken literally in this implementation. I dont believe the Kafka binding spec is clear enough on the mapping, and probably the reason this PR is halfbaked.

@slinkydeveloper - as an FYI I wrote the spec so I'm pretty sure I have a clear understanding of what it means and hence why I think it is not clear enough. Also note - I don't appreciate my previous issue on partitioning being closed without consultation.

@gunnarmorling
Copy link
Contributor

@gunnarmorling - when the record is presented to the interceptor has a key, and when it leaves the interceptor it a different key - hence it was rekeyed. It breaks a Kafka fundamental.

Ok, I see now where you're coming from. Tbh. the overall usage of this SDK in the context of Kafka is still somewhat fuzzy to me; e.g. in the example here, the ProducerRecord is created with a null key, in which case the interceptor (if used) would set the key to the partition key value given in the CE structure.

But then I could simply create the ProducerRecord giving a key, instead of relying on the interceptor for setting it. In that case, the partition key value (if present) needs to match that explicitly given key for the reasons you describe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka transport does not respect the partitioning extension
6 participants