Skip to content

Commit

Permalink
Config adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
Rian Josua Masikome committed Apr 27, 2021
1 parent 4aa5344 commit 2cb857b
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 130 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,4 @@ zk-multiple-kafka-multiple
.vscode
.idea

config
kminion
config.yaml
ca.pem
config
9 changes: 2 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,11 @@ func main() {
kgoOpts := []kgo.Opt{}
if cfg.Minion.EndToEnd.Enabled {
ack := kgo.AllISRAcks()
switch cfg.Minion.EndToEnd.Producer.RequiredAcks {
case 0:
ack = kgo.NoAck()
case 1:
if cfg.Minion.EndToEnd.Producer.RequiredAcks == 1 {
ack = kgo.LeaderAck()
}
kgoOpts = append(kgoOpts, kgo.RequiredAcks(ack))
if cfg.Minion.EndToEnd.Producer.RequiredAcks != -1 {
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.RequiredAcks(ack))
}
// Create kafka service and check if client can successfully connect to Kafka cluster
kafkaSvc, err := kafka.NewService(cfg.Kafka, logger, kgoOpts)
Expand Down
14 changes: 14 additions & 0 deletions minion/config_endtoend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,23 @@ type EndToEndConfig struct {

func (c *EndToEndConfig) SetDefaults() {
c.Enabled = false
c.ProbeInterval = 2 * time.Second
c.TopicManagement.SetDefaults()
c.Producer.SetDefaults()
c.Consumer.SetDefaults()
}

func (c *EndToEndConfig) Validate() error {

if !c.Enabled {
return nil
}

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.ProbeInterval == 0 {
return fmt.Errorf("failed to validate probeInterval config, the duration can't be zero")
}

err := c.TopicManagement.Validate()
if err != nil {
return fmt.Errorf("failed to validate topicManagement config: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions minion/config_endtoend_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ type EndToEndConsumerConfig struct {
func (c *EndToEndConsumerConfig) SetDefaults() {
c.GroupId = "kminion-end-to-end"
c.RebalancingProtocol = "cooperativeSticky"
latency, _ := time.ParseDuration("20s")
c.LatencySla = latency
c.LatencySla = 20 * time.Second
}

func (c *EndToEndConsumerConfig) Validate() error {
Expand All @@ -33,5 +32,10 @@ func (c *EndToEndConsumerConfig) Validate() error {
return fmt.Errorf("given RebalancingProtocol '%v' is invalid", c.RebalancingProtocol)
}

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.LatencySla == 0 {
return fmt.Errorf("failed to validate consumer.latencySla config, the duration can't be zero")
}

return nil
}
19 changes: 12 additions & 7 deletions minion/config_endtoend_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@ type EndToEndProducerConfig struct {
}

func (c *EndToEndProducerConfig) SetDefaults() {
latency, _ := time.ParseDuration("5s")
c.LatencySla = latency
c.LatencySla = 5 * time.Second
c.RequiredAcks = -1
}

func (c *EndToEndProducerConfig) Validate() error {
_, err := time.ParseDuration(c.LatencySla.String())
if err != nil {
return fmt.Errorf("failed to parse '%s' to time.Duration: %v", c.LatencySla.String(), err)

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.LatencySla == 0 {
return fmt.Errorf("failed to validate producer.latencySla config, the duration can't be zero")
}

if c.RequiredAcks < -1 || c.RequiredAcks > 1 {
return fmt.Errorf("failed to parse producer.RequiredAcks, valid value is either -1, 0, 1")
switch c.RequiredAcks {
// Only allows -1 All ISR Ack, idempotence EOS on producing message
// or 1 where the Leader Ack is neede, the rest should return error
case -1, 1:
default:
return fmt.Errorf("failed to parse producer.RequiredAcks, valid value is either -1, 1")
}

return nil
}
7 changes: 4 additions & 3 deletions minion/config_endtoend_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type EndToEndTopicConfig struct {
func (c *EndToEndTopicConfig) SetDefaults() {
c.Enabled = true
c.Name = "kminion-end-to-end"
c.ReconcilationInterval = 10 * time.Minute
}

func (c *EndToEndTopicConfig) Validate() error {
Expand All @@ -28,9 +29,9 @@ func (c *EndToEndTopicConfig) Validate() error {
return fmt.Errorf("failed to parse partitionsPerBroker, it should be more than 1, retrieved value %v", c.ReplicationFactor)
}

_, err := time.ParseDuration(c.ReconcilationInterval.String())
if err != nil {
return fmt.Errorf("failed to parse '%s' to time.Duration: %v", c.ReconcilationInterval.String(), err)
// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.ReconcilationInterval == 0 {
return fmt.Errorf("failed to validate topic.ReconcilationInterval config, the duration can't be zero")
}

return nil
Expand Down
34 changes: 16 additions & 18 deletions minion/endtoend_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
client := s.kafkaSvc.Client
topicMessage := s.Cfg.EndToEnd.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicMessage)
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer())
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer
switch s.Cfg.EndToEnd.Consumer.RebalancingProtocol {
case RoundRobin:
balancer = kgo.Balancers(kgo.RoundRobinBalancer())
Expand Down Expand Up @@ -60,23 +60,21 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
latencySec := float64(timeNowMs()-res.Timestamp) / float64(1000)
s.observeLatencyHistogram(latencySec, int(record.Partition))
s.storage.markRecordConsumed(record)
uncommittedOffset := client.UncommittedOffsets()
// Only commit if uncommittedOffset return value
if uncommittedOffset != nil {
startCommitTimestamp := timeNowMs()
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil {
if err != kgo.ErrNoDial {
s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
}
s.setCachedItem("end_to_end_consumer_offset_availability", false, 120*time.Second)
} else {
commitLatencySec := float64(timeNowMs()-startCommitTimestamp) / float64(1000)
s.observeCommitLatencyHistogram(commitLatencySec, int(record.Partition))
s.setCachedItem("end_to_end_consumer_offset_availability", true, 120*time.Second)
}
})
}
}
uncommittedOffset := client.UncommittedOffsets()
// Only commit if uncommittedOffset returns value
if uncommittedOffset != nil {
startCommitTimestamp := timeNowMs()
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil {
s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
s.setCachedItem("end_to_end_consumer_offset_availability", false, 120*time.Second)
} else {
commitLatencySec := float64(timeNowMs()-startCommitTimestamp) / float64(1000)
s.observeCommitLatencyHistogram(commitLatencySec, int(record.Partition))
s.setCachedItem("end_to_end_consumer_offset_availability", true, 120*time.Second)
}
})
}
s.setCachedItem("end_to_end_consume_duration", timeNowMs()-startConsumeTimestamp, 120*time.Second)
}
Expand Down
25 changes: 13 additions & 12 deletions minion/endtoend_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package minion

import (
"math"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -20,40 +19,42 @@ func getBucket(cfg Config) []float64 {
return bucket
}

func initEndtoendLatencyHistogram(cfg Config, metricNamespace string) *prometheus.HistogramVec {
histogramVec := promauto.NewHistogramVec(prometheus.HistogramOpts{
func initEndtoendLatencyHistogram(cfg Config, metricNamespace string) *prometheus.Histogram {
histogram := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: "kafka",
Name: "endtoend_latency_seconds",
Help: "Time it has taken to consume a Kafka message via a Consumer Group which KMinion has produced before",
Buckets: getBucket(cfg),
}, []string{"partition_id"})
})

return histogramVec
return &histogram
}

func initCommitLatencyHistogram(cfg Config, metricNamespace string) *prometheus.HistogramVec {
histogramVec := promauto.NewHistogramVec(prometheus.HistogramOpts{
func initCommitLatencyHistogram(cfg Config, metricNamespace string) *prometheus.Histogram {
histogram := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: "kafka",
Name: "commit_latency_seconds",
Help: "Time it has taken to commit the message via a Consumer Group on KMinion Management Topic",
Buckets: getBucket(cfg),
}, []string{"partition_id"})
})

return histogramVec
return &histogram
}

func (s *Service) GetLatencyHistogram() *prometheus.HistogramVec {
func (s *Service) GetLatencyHistogram() *prometheus.Histogram {
return s.endtoendLatencyHistogram
}

func (s *Service) observeLatencyHistogram(time float64, partition int) error {
s.endtoendLatencyHistogram.WithLabelValues(strconv.Itoa(partition)).Observe(time)
h := *s.endtoendLatencyHistogram
h.Observe(time)
return nil
}

func (s *Service) observeCommitLatencyHistogram(time float64, partition int) error {
s.commitLatencyHistogram.WithLabelValues(strconv.Itoa(partition)).Observe(time)
h := *s.commitLatencyHistogram
h.Observe(time)
return nil
}
Loading

0 comments on commit 2cb857b

Please sign in to comment.