From c4c4a8d963d76d2b1dffde248e730d40b95e0c00 Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Mon, 22 Oct 2018 10:57:50 +0200 Subject: [PATCH] Configurable deadlock detector interval for ingester. Value of 0 disables deadlock_detector. #issue1225 Signed-off-by: Chodor Marek --- cmd/ingester/app/builder/builder.go | 9 +-- cmd/ingester/app/consumer/consumer.go | 11 ++-- .../app/consumer/deadlock_detector.go | 66 +++++++++++++------ .../app/consumer/deadlock_detector_test.go | 50 ++++++++++++++ cmd/ingester/app/flags.go | 21 +++++- cmd/ingester/app/flags_test.go | 14 ++++ 6 files changed, 139 insertions(+), 32 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index ff6164b1d81e..454b489fad66 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit } consumerParams := consumer.Params{ - InternalConsumer: saramaConsumer, - ProcessorFactory: *processorFactory, - Factory: metricsFactory, - Logger: logger, + InternalConsumer: saramaConsumer, + ProcessorFactory: *processorFactory, + Factory: metricsFactory, + Logger: logger, + DeadlockCheckInterval: options.DeadlockInterval, } return consumer.New(consumerParams) } diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index c939eac60c4c..aa899a061745 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -29,10 +29,11 @@ import ( // Params are the parameters of a Consumer type Params struct { - ProcessorFactory ProcessorFactory - Factory metrics.Factory - Logger *zap.Logger - InternalConsumer consumer.Consumer + ProcessorFactory ProcessorFactory + Factory metrics.Factory + Logger *zap.Logger + InternalConsumer consumer.Consumer + DeadlockCheckInterval time.Duration } // Consumer uses sarama to consume and handle messages from kafka @@ -55,7 +56,7 @@ type consumerState struct { // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute) + deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ metricsFactory: params.Factory, logger: params.Logger, diff --git a/cmd/ingester/app/consumer/deadlock_detector.go b/cmd/ingester/app/consumer/deadlock_detector.go index 4b5b26967424..1aa9a4ef7715 100644 --- a/cmd/ingester/app/consumer/deadlock_detector.go +++ b/cmd/ingester/app/consumer/deadlock_detector.go @@ -52,12 +52,14 @@ type partitionDeadlockDetector struct { closePartition chan struct{} done chan struct{} incrementAllPartitionMsgCount func() + closed bool } type allPartitionsDeadlockDetector struct { msgConsumed *uint64 logger *zap.Logger done chan struct{} + closed bool } func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { @@ -87,13 +89,19 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti closePartition: make(chan struct{}, 1), done: make(chan struct{}), logger: s.logger, + closed: false, incrementAllPartitionMsgCount: func() { s.allPartitionsDeadlockDetector.incrementMsgCount() }, } - go s.monitorForPartition(w, partition) + if s.interval == 0 { + s.logger.Debug("Partition deadlock detector disabled") + w.closed = true + } else { + go s.monitorForPartition(w, partition) + } return w } @@ -135,34 +143,45 @@ func (s *deadlockDetector) start() { msgConsumed: &msgConsumed, done: make(chan struct{}), logger: s.logger, + closed: false, } - go func() { + if s.interval == 0 { + s.logger.Debug("Global deadlock detector disabled") + detector.closed = true + } else { s.logger.Debug("Starting global deadlock detector") - ticker := time.NewTicker(s.interval) - defer ticker.Stop() - - for { - select { - case <-detector.done: - s.logger.Debug("Closing global ticker routine") - return - case <-ticker.C: - if atomic.LoadUint64(detector.msgConsumed) == 0 { - s.panicFunc(-1) - return // For tests + go func() { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-detector.done: + s.logger.Debug("Closing global ticker routine") + return + case <-ticker.C: + if atomic.LoadUint64(detector.msgConsumed) == 0 { + s.panicFunc(-1) + return // For tests + } + atomic.StoreUint64(detector.msgConsumed, 0) } - atomic.StoreUint64(detector.msgConsumed, 0) } - } - }() + }() + } s.allPartitionsDeadlockDetector = detector } func (s *deadlockDetector) close() { - s.logger.Debug("Closing all partitions deadlock detector") - s.allPartitionsDeadlockDetector.done <- struct{}{} + if !s.allPartitionsDeadlockDetector.closed { + s.logger.Debug("Closing all partitions deadlock detector") + s.allPartitionsDeadlockDetector.closed = true + s.allPartitionsDeadlockDetector.done <- struct{}{} + } else { + s.logger.Debug("All partitions deadlock detector already closed") + } } func (s *allPartitionsDeadlockDetector) incrementMsgCount() { @@ -174,8 +193,13 @@ func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} { } func (w *partitionDeadlockDetector) close() { - w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition)) - w.done <- struct{}{} + if !w.closed { + w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition)) + w.closed = true + w.done <- struct{}{} + } else { + w.logger.Debug("Deadlock detector already closed", zap.Int32("partition", w.partition)) + } } func (w *partitionDeadlockDetector) incrementMsgCount() { diff --git a/cmd/ingester/app/consumer/deadlock_detector_test.go b/cmd/ingester/app/consumer/deadlock_detector_test.go index 0cb31fbd1bfc..f705d58d8fd2 100644 --- a/cmd/ingester/app/consumer/deadlock_detector_test.go +++ b/cmd/ingester/app/consumer/deadlock_detector_test.go @@ -112,3 +112,53 @@ func TestGlobalPanic(t *testing.T) { d.start() wg.Wait() } + +func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + d.start() + + time.Sleep(100 * time.Millisecond) + + d.close() +} + +func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + w := d.startMonitoringForPartition(1) + time.Sleep(100 * time.Millisecond) + + w.close() +} + +//same as TestNoClosingSignalIfMessagesProcessedInInterval but with disabled deadlock detector +func TestApiCompatibilityWhenDeadlockDetectorDisabled(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, 0) + f.start() + defer f.close() + + w := f.startMonitoringForPartition(1) + + w.incrementMsgCount() + assert.Zero(t, len(w.closePartitionChannel())) + w.close() +} diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 2439189fcd86..7fc21a0e8aa2 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/spf13/viper" @@ -43,6 +44,8 @@ const ( SuffixParallelism = ".parallelism" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" + // SuffixDeadlockInterval is a suffix for deadlock detecor flag + SuffixDeadlockInterval = ".deadlockInterval" // DefaultBroker is the default kafka broker DefaultBroker = "127.0.0.1:9092" @@ -54,13 +57,16 @@ const ( DefaultParallelism = 1000 // DefaultEncoding is the default span encoding DefaultEncoding = EncodingProto + // DefaultDeadlockInterval is the default deadlock interval + DefaultDeadlockInterval = 1 * time.Minute ) // Options stores the configuration options for the Ingester type Options struct { kafkaConsumer.Configuration - Parallelism int - Encoding string + Parallelism int + Encoding string + DeadlockInterval time.Duration } // AddFlags adds flags for Builder @@ -85,6 +91,10 @@ func AddFlags(flagSet *flag.FlagSet) { ConfigPrefix+SuffixEncoding, DefaultEncoding, fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON)) + flagSet.String( + ConfigPrefix+SuffixDeadlockInterval, + DefaultDeadlockInterval.String(), + "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.") } // InitFromViper initializes Builder with properties from viper @@ -94,4 +104,11 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding) + + d, err := time.ParseDuration(v.GetString(ConfigPrefix + SuffixDeadlockInterval)) + if err != nil { + o.DeadlockInterval = DefaultDeadlockInterval + } else { + o.DeadlockInterval = d + } } diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 84a27b1c65b7..d7fb07cc5c9d 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -16,6 +16,7 @@ package app import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -30,6 +31,7 @@ func TestOptionsWithFlags(t *testing.T) { "--ingester.brokers=127.0.0.1:9092,0.0.0:1234", "--ingester.group-id=group1", "--ingester.parallelism=5", + "--ingester.deadlockInterval=2m", "--ingester.encoding=json"}) o.InitFromViper(v) @@ -37,6 +39,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, 5, o.Parallelism) + assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, EncodingJSON, o.Encoding) } @@ -51,4 +54,15 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, DefaultGroupID, o.GroupID) assert.Equal(t, DefaultParallelism, o.Parallelism) assert.Equal(t, DefaultEncoding, o.Encoding) + assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) +} + +func TestUnparsableDeadlockIntervalFlag(t *testing.T) { + o := &Options{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--ingester.deadlockInterval=hello"}) + o.InitFromViper(v) + + assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) }