Skip to content

Commit

Permalink
Adding support for specify input kafka offset for each partitions (#1242
Browse files Browse the repository at this point in the history
)

Adding a new flag `--input-kafka-offset` for supporting consumes from Kafka by specified offset.
  • Loading branch information
g0ne150 authored Apr 9, 2024
1 parent 79e123c commit abbd249
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 19 deletions.
83 changes: 73 additions & 10 deletions input_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package goreplay
import (
"encoding/json"
"log"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
Expand All @@ -12,14 +14,24 @@ import (
// KafkaInput is used for receiving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
config *InputKafkaConfig
consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
quit chan struct{}
config *InputKafkaConfig
consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
speedFactor float64
quit chan struct{}
kafkaTimer *kafkaTimer
}

func getOffsetOfPartitions(offsetCfg string) int64 {
offset, err := strconv.ParseInt(offsetCfg, 10, 64)
if err != nil || offset < -2 {
log.Fatalln("Failed to parse offset: "+offsetCfg, err)
}
return offset
}

// NewKafkaInput creates instance of kafka consumer client with TLS config
func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
func NewKafkaInput(offsetCfg string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
c := NewKafkaConfig(&config.SASLConfig, tlsConfig)

var con sarama.Consumer
Expand All @@ -41,14 +53,17 @@ func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig
}

i := &KafkaInput{
config: config,
consumers: make([]sarama.PartitionConsumer, len(partitions)),
messages: make(chan *sarama.ConsumerMessage, 256),
quit: make(chan struct{}),
config: config,
consumers: make([]sarama.PartitionConsumer, len(partitions)),
messages: make(chan *sarama.ConsumerMessage, 256),
speedFactor: 1,
quit: make(chan struct{}),
kafkaTimer: new(kafkaTimer),
}
i.config.Offset = offsetCfg

for index, partition := range partitions {
consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest)
consumer, err := con.ConsumePartition(config.Topic, partition, getOffsetOfPartitions(offsetCfg))
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) partition consumer:", err)
}
Expand Down Expand Up @@ -86,12 +101,15 @@ func (i *KafkaInput) PluginRead() (*Message, error) {
case message = <-i.messages:
}

inputTs := ""

msg.Data = message.Value
if i.config.UseJSON {

var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)

inputTs = kafkaMessage.ReqTs
var err error
msg.Data, err = kafkaMessage.Dump()
if err != nil {
Expand All @@ -103,8 +121,11 @@ func (i *KafkaInput) PluginRead() (*Message, error) {
// does it have meta
if isOriginPayload(msg.Data) {
msg.Meta, msg.Data = payloadMetaWithBody(msg.Data)
inputTs = string(payloadMeta(msg.Meta)[2])
}

i.timeWait(inputTs)

return &msg, nil

}
Expand All @@ -118,3 +139,45 @@ func (i *KafkaInput) Close() error {
close(i.quit)
return nil
}

func (i *KafkaInput) timeWait(curInputTs string) {
if i.config.Offset == "-1" || curInputTs == "" {
return
}

// implement for Kafka input showdown or speedup emitting
timer := i.kafkaTimer
curTs := time.Now().UnixNano()

curInput, err := strconv.ParseInt(curInputTs, 10, 64)

if timer.latestInputTs == 0 || timer.latestOutputTs == 0 {
timer.latestInputTs = curInput
timer.latestOutputTs = curTs
return
}

if err != nil {
log.Fatalln("Fatal to parse timestamp err: ", err)
}

diffTs := curInput - timer.latestInputTs
pastTs := curTs - timer.latestOutputTs

diff := diffTs - pastTs
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}

if diff > 0 {
time.Sleep(time.Duration(diff))
}

timer.latestInputTs = curInput
timer.latestOutputTs = curTs
}

type kafkaTimer struct {
latestInputTs int64
latestOutputTs int64
}
4 changes: 2 additions & 2 deletions input_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestInputKafkaRAW(t *testing.T) {
map[string][]int32{"test": {0}},
)

input := NewKafkaInput("", &InputKafkaConfig{
input := NewKafkaInput("-1", &InputKafkaConfig{
consumer: consumer,
Topic: "test",
UseJSON: false,
Expand All @@ -42,7 +42,7 @@ func TestInputKafkaJSON(t *testing.T) {
map[string][]int32{"test": {0}},
)

input := NewKafkaInput("", &InputKafkaConfig{
input := NewKafkaInput("-1", &InputKafkaConfig{
consumer: consumer,
Topic: "test",
UseJSON: true,
Expand Down
1 change: 1 addition & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type InputKafkaConfig struct {
Host string `json:"input-kafka-host"`
Topic string `json:"input-kafka-topic"`
UseJSON bool `json:"input-kafka-json-format"`
Offset string `json:"input-kafka-offset"`
SASLConfig SASLKafkaConfig
}

Expand Down
39 changes: 33 additions & 6 deletions limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func parseLimitOptions(options string) (limit int, isPercent bool) {
return
}

func newLimiterExceptions(l *Limiter) {

if !l.isPercent {
return
}
speedFactor := float64(l.limit) / float64(100)

// FileInput、KafkaInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
switch input := l.plugin.(type) {
case *FileInput:
input.speedFactor = speedFactor
case *KafkaInput:
input.speedFactor = speedFactor
}
}

// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{}, options string) PluginReadWriter {
Expand All @@ -39,17 +55,28 @@ func NewLimiter(plugin interface{}, options string) PluginReadWriter {
l.plugin = plugin
l.currentTime = time.Now().UnixNano()

// FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
if fi, ok := l.plugin.(*FileInput); ok && l.isPercent {
fi.speedFactor = float64(l.limit) / float64(100)
}
newLimiterExceptions(l)

return l
}

func (l *Limiter) isLimitedExceptions() bool {
if !l.isPercent {
return false
}
// Fileinput、Kafkainput have its own limiting algorithm
switch l.plugin.(type) {
case *FileInput:
return true
case *KafkaInput:
return true
default:
return false
}
}

func (l *Limiter) isLimited() bool {
// File input have its own limiting algorithm
if _, ok := l.plugin.(*FileInput); ok && l.isPercent {
if l.isLimitedExceptions() {
return false
}

Expand Down
2 changes: 1 addition & 1 deletion plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func NewPlugins() *InOutPlugins {
}

if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
plugins.registerPlugin(NewKafkaInput, Settings.InputKafkaConfig.Offset, &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
}

return plugins
Expand Down
1 change: 1 addition & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func init() {
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Mechanism, "input-kafka-mechanism", "", "mechanism\n\tgor --input-raw :8080 --output-kafka-mechanism 'SCRAM-SHA-512'")
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Username, "input-kafka-username", "", "username\n\tgor --input-raw :8080 --output-kafka-username 'username'")
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Password, "input-kafka-password", "", "password\n\tgor --input-raw :8080 --output-kafka-password 'password'")
flag.StringVar(&Settings.InputKafkaConfig.Offset, "input-kafka-offset", "-1", "Specify offset in Kafka partitions start to consume\n\t-1: Starts from newest, -2: Starts from oldest\nAnd supported for showdown or speedup for emitting!\n\tgor --input-kafka-offset \"-2|200%\"")

flag.StringVar(&Settings.KafkaTLSConfig.CACert, "kafka-tls-ca-cert", "", "CA certificate for Kafka TLS Config:\n\tgor --input-raw :3000 --output-kafka-host '192.168.0.1:9092' --output-kafka-topic 'topic' --kafka-tls-ca-cert cacert.cer.pem --kafka-tls-client-cert client.cer.pem --kafka-tls-client-key client.key.pem")
flag.StringVar(&Settings.KafkaTLSConfig.ClientCert, "kafka-tls-client-cert", "", "Client certificate for Kafka TLS Config (mandatory with to kafka-tls-ca-cert and kafka-tls-client-key)")
Expand Down

0 comments on commit abbd249

Please sign in to comment.