Skip to content

Commit 3f47233

Browse files
authored
fix(kafka): Fixes partition selection in distributors (#14242)
1 parent b6e9945 commit 3f47233

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

pkg/distributor/distributor.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -901,12 +901,11 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
901901
if len(stream.Stream.Entries) == 0 {
902902
return nil
903903
}
904-
/* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
905-
if err != nil {
906-
d.kafkaAppends.WithLabelValues("kafka", "fail").Inc()
907-
return fmt.Errorf("failed to find active partition for stream: %w", err)
908-
}*/
909-
partitionID := int32(0)
904+
partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
905+
if err != nil {
906+
d.kafkaAppends.WithLabelValues("kafka", "fail").Inc()
907+
return fmt.Errorf("failed to find active partition for stream: %w", err)
908+
}
910909

911910
startTime := time.Now()
912911

0 commit comments

Comments
 (0)