Skip to content

Commit

Permalink
many changes:
Browse files Browse the repository at this point in the history
- new metrics (in 3 groups): low level, SLA indicators, latency histograms
- don't put metrics results in a cache, instead directly increment/observe/set on promauto objects
- only generate minionID once, don't create a new one for every message
- actually check minionID to filter messages from other kminion instances
- check timestamp to discard old messages and establish roundtrip latency
- keep track of last received message timestamp, so we can detect/skip cases where we receive older messages for whatever reason
- don't call storage.markRecordConsumed since that's only for offset_consumer
  • Loading branch information
rikimaru0345 committed Apr 29, 2021
1 parent 4ae6088 commit c8e360a
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 247 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ zk-multiple-kafka-multiple
config

# go debug binary
__debug_bin
__debug_bin

notes.md
20 changes: 13 additions & 7 deletions minion/config_endtoend_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ const (
)

type EndToEndConsumerConfig struct {
GroupId string `koanf:"groupId"`
LatencySla time.Duration `koanf:"latencySla"`
RebalancingProtocol string `koanf:"rebalancingProtocol"`
GroupId string `koanf:"groupId"`
RebalancingProtocol string `koanf:"rebalancingProtocol"`

RoundtripSla time.Duration `koanf:"roundtripSla"`
CommitSla time.Duration `koanf:"commitSla"`
}

func (c *EndToEndConsumerConfig) SetDefaults() {
c.GroupId = "kminion-end-to-end"
c.RebalancingProtocol = "cooperativeSticky"
c.LatencySla = 20 * time.Second
c.RoundtripSla = 20 * time.Second
c.CommitSla = 10 * time.Second // no idea what to use as a good default value
}

func (c *EndToEndConsumerConfig) Validate() error {
Expand All @@ -32,9 +35,12 @@ func (c *EndToEndConsumerConfig) Validate() error {
return fmt.Errorf("given RebalancingProtocol '%v' is invalid", c.RebalancingProtocol)
}

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.LatencySla == 0 {
return fmt.Errorf("failed to validate consumer.latencySla config, the duration can't be zero")
if c.RoundtripSla <= 0 {
return fmt.Errorf("consumer.roundtripSla must be greater than zero")
}

if c.CommitSla <= 0 {
return fmt.Errorf("consumer.commitSla must be greater than zero")
}

return nil
Expand Down
18 changes: 7 additions & 11 deletions minion/config_endtoend_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,24 @@ import (
)

type EndToEndProducerConfig struct {
LatencySla time.Duration `koanf:"latencySla"`
AckSla time.Duration `koanf:"ackSla"`
RequiredAcks int `koanf:"requiredAcks"`
}

func (c *EndToEndProducerConfig) SetDefaults() {
c.LatencySla = 5 * time.Second
c.AckSla = 5 * time.Second
c.RequiredAcks = -1
}

func (c *EndToEndProducerConfig) Validate() error {

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.LatencySla == 0 {
return fmt.Errorf("failed to validate producer.latencySla config, the duration can't be zero")
if c.AckSla <= 0 {
return fmt.Errorf("producer.ackSla must be greater than zero")
}

switch c.RequiredAcks {
// Only allows -1 All ISR Ack, idempotence EOS on producing message
// or 1 where the Leader Ack is neede, the rest should return error
case -1, 1:
default:
return fmt.Errorf("failed to parse producer.RequiredAcks, valid value is either -1, 1")
// all(-1) or leader(1)
if c.RequiredAcks != -1 && c.RequiredAcks != 1 {
return fmt.Errorf("producer.requiredAcks must be 1 or -1")
}

return nil
Expand Down
68 changes: 50 additions & 18 deletions minion/endtoend_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
client := s.kafkaSvc.Client
topicMessage := s.Cfg.EndToEnd.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicMessage)
topicName := s.Cfg.EndToEnd.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer
switch s.Cfg.EndToEnd.Consumer.RebalancingProtocol {
case RoundRobin:
Expand All @@ -25,16 +25,18 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
balancer = kgo.Balancers(kgo.StickyBalancer())
}
client.AssignPartitions(topic)
client.AssignGroup(s.Cfg.EndToEnd.Consumer.GroupId, kgo.GroupTopics(topicMessage), balancer)
s.logger.Info("starting to consume topicManagement")

// todo: use minionID as part of group id
//
client.AssignGroup(s.Cfg.EndToEnd.Consumer.GroupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
s.logger.Info("Starting to consume " + topicName)

for {
select {
case <-ctx.Done():
return nil
default:
startConsumeTimestamp := timeNowMs()
fetches := client.PollFetches(ctx)
fetches := client.PollRecords(ctx, 10)
errors := fetches.Errors()
for _, err := range errors {
// Log all errors and continue afterwards as we might get errors and still have some fetch results
Expand All @@ -44,6 +46,10 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
zap.Error(err.Err))
}

receiveTimestamp := timeNowMs()

//
// Process messages
iter := fetches.RecordIter()
var record *kgo.Record
for !iter.Done() {
Expand All @@ -53,30 +59,56 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
continue
}

res := TopicManagementRecord{}
json.Unmarshal(record.Value, &res)
// Deserialize message
var msg TopicManagementRecord
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
continue // failed, maybe sent by an older version?
}

if msg.MinionID != s.minionID {
continue // we didn't send this message
}

if msg.Timestamp < s.lastRoundtripTimestamp {
continue // received an older message
}

// Push the latency to endtoendLatencies that will be consumed by prometheus later
latencySec := float64(timeNowMs()-res.Timestamp) / float64(1000)
s.observeLatencyHistogram(latencySec, int(record.Partition))
s.storage.markRecordConsumed(record)
latencyMs := receiveTimestamp - msg.Timestamp
if latencyMs > s.Cfg.EndToEnd.Consumer.RoundtripSla.Milliseconds() {
s.endToEndWithinRoundtripSla.Set(0) // we're no longer within the roundtrip sla
continue // message is too old
}

// Message is a match and arrived in time!
s.lastRoundtripTimestamp = msg.Timestamp
s.endToEndMessagesReceived.Inc()
s.endToEndRoundtripLatency.Observe(float64(latencyMs) / 1000)
}
uncommittedOffset := client.UncommittedOffsets()
// Only commit if uncommittedOffset returns value
if uncommittedOffset != nil {

//
// Commit offsets for processed messages
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

startCommitTimestamp := timeNowMs()

client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
// got commit response
if err != nil {
s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
s.setCachedItem("end_to_end_consumer_offset_availability", false, 120*time.Second)
} else {
commitLatencySec := float64(timeNowMs()-startCommitTimestamp) / float64(1000)
s.observeCommitLatencyHistogram(commitLatencySec, int(record.Partition))
s.setCachedItem("end_to_end_consumer_offset_availability", true, 120*time.Second)
s.endToEndCommitLatency.Observe(commitLatencySec)
s.endToEndMessagesCommitted.Inc()

if commitLatencySec <= s.Cfg.EndToEnd.Consumer.CommitSla.Seconds() {
s.endToEndWithinCommitSla.Set(1)
} else {
s.endToEndWithinCommitSla.Set(0)
}
}
})
}
s.setCachedItem("end_to_end_consume_duration", timeNowMs()-startConsumeTimestamp, 120*time.Second)
}
}

Expand Down
60 changes: 0 additions & 60 deletions minion/endtoend_histogram.go

This file was deleted.

59 changes: 20 additions & 39 deletions minion/endtoend_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -15,44 +14,50 @@ type TopicManagementRecord struct {
Timestamp int64 `json:"timestamp"`
}

func (s *Service) ProduceToManagementTopic(ctx context.Context) (*string, error) {
func (s *Service) produceToManagementTopic(ctx context.Context) error {

topicName := s.Cfg.EndToEnd.TopicManagement.Name
minionID := uuid.NewString()
record, err := newManagementTopicRecord(topicName, minionID)
produceCounts := s.ProduceCounts(ctx)
produceAcks := s.ProduceAcks(ctx)

record, err := createEndToEndRecord(topicName, s.minionID)
if err != nil {
return nil, err
return err
}

for {
select {
case <-ctx.Done():
return nil, nil
return nil
default:
startTime := timeNowMs()
// This function abstract the producing message to all partitions
s.setCachedItem("end_to_end_produce_counts", produceCounts+1, 120*time.Second)
s.endToEndMessagesProduced.Inc()

err = s.kafkaSvc.Client.Produce(ctx, record, func(r *kgo.Record, err error) {
endTime := timeNowMs()
ackDuration := endTime - startTime

if err != nil {
fmt.Printf("record had a produce error: %v\n", err)
} else {
s.setCachedItem("end_to_end_produce_acks", produceAcks+1, 120*time.Second)
s.setCachedItem("end_to_end_produce_duration", timeNowMs()-startTime, 120*time.Second)
s.endToEndMessagesAcked.Inc()

if ackDuration < s.Cfg.EndToEnd.Producer.AckSla.Milliseconds() {
s.endToEndWithinRoundtripSla.Set(1)
} else {
s.endToEndWithinRoundtripSla.Set(0)
}
}
})

if err != nil {
return nil, err
return err
}
return &minionID, nil
return nil
}
}

}

func newManagementTopicRecord(topicName string, minionID string) (*kgo.Record, error) {
func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error) {

timestamp := timeNowMs()
message := TopicManagementRecord{
Expand All @@ -71,27 +76,3 @@ func newManagementTopicRecord(topicName string, minionID string) (*kgo.Record, e

return record, nil
}

func (s *Service) ProduceDurationMs(ctx context.Context) (int64, bool) {
ms, exists := s.getCachedItem("end_to_end_produce_duration")
if exists {
return ms.(int64), true
}
return 0, false
}

func (s *Service) ProduceCounts(ctx context.Context) int64 {
counts, exists := s.getCachedItem("end_to_end_produce_counts")
if exists {
return counts.(int64)
}
return 0
}

func (s *Service) ProduceAcks(ctx context.Context) int64 {
acks, exists := s.getCachedItem("end_to_end_produce_acks")
if exists {
return acks.(int64)
}
return 0
}
6 changes: 3 additions & 3 deletions minion/endtoend_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
return nil
}

func getTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig {
func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig {

minISRConf := kmsg.NewCreateTopicsRequestTopicConfig()
minISR := strconv.Itoa(cfgTopic.ReplicationFactor)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *Service) createManagementTopic(ctx context.Context, topicMetadata *kmsg
s.logger.Info(fmt.Sprintf("creating topic %s for EndToEnd metrics", s.Cfg.EndToEnd.TopicManagement.Name))

cfgTopic := s.Cfg.EndToEnd.TopicManagement
topicConfigs := getTopicConfig(cfgTopic)
topicConfigs := createTopicConfig(cfgTopic)

topic := kmsg.NewCreateTopicsRequestTopic()
topic.Topic = cfgTopic.Name
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *Service) initEndToEnd(ctx context.Context) {

t := time.NewTicker(s.Cfg.EndToEnd.ProbeInterval)
for range t.C {
s.ProduceToManagementTopic(ctx)
s.produceToManagementTopic(ctx)
}
}
}
Expand Down
Loading

0 comments on commit c8e360a

Please sign in to comment.