Skip to content

Commit 119525b

Browse files
committed
Merge branch 'master' into gcpsecret
2 parents 544c04f + e3e27d9 commit 119525b

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

pubsub/kafkapubsub/kafka.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,20 @@ func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error {
229229
// Convert the messages to a slice of sarama.ProducerMessage.
230230
ms := make([]*sarama.ProducerMessage, 0, len(dms))
231231
for _, dm := range dms {
232-
var kafkaKey []byte
232+
var kafkaKey sarama.Encoder
233233
var headers []sarama.RecordHeader
234234
for k, v := range dm.Metadata {
235235
if k == t.opts.KeyName {
236236
// Use this key's value as the Kafka message key instead of adding it
237237
// to the headers.
238-
kafkaKey = []byte(v)
238+
kafkaKey = sarama.ByteEncoder(v)
239239
} else {
240240
headers = append(headers, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
241241
}
242242
}
243243
pm := &sarama.ProducerMessage{
244244
Topic: t.topicName,
245-
Key: sarama.ByteEncoder(kafkaKey),
245+
Key: kafkaKey,
246246
Value: sarama.ByteEncoder(dm.Body),
247247
Headers: headers,
248248
}

0 commit comments

Comments
 (0)