Skip to content

Conversation

@chenmx98
Copy link

What motivated this proposal?

The current nats-kafka-bridge is missing the capability to demux messages from kafka to nats based on the kafka messages' header values. With this change, we can use nats-kafka-bridge as a demuxer by further specifying the patterns or criterions in the kafka message header.

What is the proposed change?

In server/core/connector.go:553 add:

		"getKafkaHeaderValue": func(key string, headers []sarama.RecordHeader)
		 string {
			for _, header := range(headers) {
				if string(header.Key) == key {
					return string(header.Value)
				}
			}
			return ""
		},

Who benefits from this change?

Anyone who is using nats-kafka-bridge as a kafka message demuxer to many nats subjects with similar patterns.

Example

connect: [
  {
      type: "KafkaToNATS",
      brokers: ["localhost:9092"]
      id: "foo",
      topic: "bar",
      subject: "Subject-{{ .Headers | getKafkaHeaderValue \"baz\"}}",
  },
]

@epa095
Copy link

epa095 commented Jan 24, 2025

So, what happened here? I see the issue is closed as completed, but this is still open.

This seems really usefull! Is it maybe already possible, just a bit lacking in documentation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants