Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka_confluent] should propagate partitionkey on an event level #1086

Open
bread-amammay opened this issue Aug 14, 2024 · 1 comment
Open

Comments

@bread-amammay
Copy link

bread-amammay commented Aug 14, 2024

Issue

Currently you have to interact with

// WithMessageKey returns back a new context with the given messageKey.
func WithMessageKey(ctx context.Context, messageKey string) context.Context {
return context.WithValue(ctx, keyForMessageKey, messageKey)
}
// MessageKeyFrom looks in the given context and returns `messageKey` as a string if found and valid, otherwise "".
func MessageKeyFrom(ctx context.Context) string {
c := ctx.Value(keyForMessageKey)
if c != nil {
if s, ok := c.(string); ok {
return s
}
}
return ""
}
to set the partition key on an outgoing event. If you have multiple events are you sending on a single context you have to keep swapping values.

Proposal

I propose that the paradigm from kafka_sarama is copied over where it checks to see if the partition key is set on an a cloud event extension, if it is, set that on the outgoing record.

transformers = append(transformers, binding.TransformerFunc(func(r binding.MessageMetadataReader, w binding.MessageMetadataWriter) error {
ext := r.GetExtension(partitionKey)
if !types.IsZero(ext) {
extStr, err := types.Format(ext)
if err != nil {
return err
}
key = extStr
}
return nil
}))

@yanmxa
Copy link
Contributor

yanmxa commented Aug 15, 2024

Thanks @bread-amammay!
I see that both implementations use the same way to inject the partitionKey for the Sender. The difference is that the sarma implementation also adds partitionKey by event extension. But what if the key exists in both context and extension?

I agree with the idea of supporting the partitionKey at the event level. But we need to document their priorities.

Do you want to have a try? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants