-
Notifications
You must be signed in to change notification settings - Fork 37
Closed
Labels
proposalEnhancement idea or proposalEnhancement idea or proposal
Description
What motivated this proposal?
The current nats-kafka-bridge is missing the capability to demux messages from kafka to nats based on the kafka messages' header values. With this change, we can use nats-kafka-bridge as a demuxer by further specifying the patterns or criterions in the kafka message header.
What is the proposed change?
In server/core/connector.go:553 add:
"getKafkaHeaderValue": func(key string, headers []sarama.RecordHeader)
string {
for _, header := range(headers) {
if string(header.Key) == key {
return string(header.Value)
}
}
return ""
},
Who benefits from this change?
Anyone who is using nats-kafka-bridge as a kafka message demuxer to many nats subjects with similar patterns.
Example
connect: [
{
type: "KafkaToNATS",
brokers: ["localhost:9092"]
id: "foo",
topic: "bar",
subject: "Subject-{{ .Headers | getKafkaHeaderValue \"baz\"}}",
},
]
No response
Metadata
Metadata
Assignees
Labels
proposalEnhancement idea or proposalEnhancement idea or proposal