Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock detector hack for Kafka driver instability #1087

Merged
merged 11 commits into from
Oct 9, 2018
Merged

Conversation

vprithvi
Copy link
Contributor

@vprithvi vprithvi commented Sep 26, 2018

Signed-off-by: Prithvi Raj p.r@uber.com

Which problem is this PR solving?

Short description of the changes

  • Adds a deadlock watcher per partition that monitors message consumption rate per minute. If the rate of consumption is zero, it triggers a rebalance by sending a signal to close the partition. If the partition close is unsuccessful, the instance is killed.

Signed-off-by: Prithvi Raj <p.r@uber.com>
@codecov
Copy link

codecov bot commented Sep 27, 2018

Codecov Report

❗ No coverage uploaded for pull request base (master@a429d78). Click here to learn what that means.
The diff coverage is 100%.

Impacted file tree graph

@@           Coverage Diff            @@
##             master   #1087   +/-   ##
========================================
  Coverage          ?    100%           
========================================
  Files             ?     141           
  Lines             ?    6723           
  Branches          ?       0           
========================================
  Hits              ?    6723           
  Misses            ?       0           
  Partials          ?       0
Impacted Files Coverage Δ
cmd/ingester/app/consumer/consumer.go 100% <100%> (ø)
cmd/ingester/app/consumer/deadlock_detector.go 100% <100%> (ø)
cmd/ingester/app/consumer/consumer_metrics.go 100% <100%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a429d78...4573d51. Read the comment docs.

@ghost ghost assigned black-adder Oct 1, 2018
cmd/ingester/app/consumer/consumer.go Show resolved Hide resolved
buf := make([]byte, 1<<20)
logger.Panic("No messages processed in the last check interval",
zap.Int32("partition", partition),
zap.String("stack", string(buf[:runtime.Stack(buf, true)])))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in hotrod zap logger automatically prints the stack. What's different here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zap only prints the current go routine - This prints out all go routines.

func (s *seppukuFactory) startMonitoringForPartition(partition int32) *seppukuWorker {
var msgConsumed uint64
w := &seppukuWorker{
msgConsumed: &msgConsumed,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason to allocate a pointer, you can still use address of a struct field with atomic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I had assumed that the pointer allocation had a similar overhead to an field allocation. I didn't want to use & on every access.

case w.closePartition <- struct{}{}:
s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition))
default:
// If closePartition is blocked, attempt seppuku
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// If closePartition is blocked, the consumer may have deadlocked -> kill the process

}

func (w *seppukuWorker) close() {
w.ticker.Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could inadvertently kill the process after rebalance, but not sure how to avoid it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate how?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, because Ticker.close() does not close the channel, the race condition I was thinking about won't happen on rebalance, but you will leak the goroutine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, I can use a separate channel to close this if you feel strongly about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's fix this, a goroutine leak is not good

Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -52,12 +56,15 @@ type consumerState struct {

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetectorFactory := newDeadlockDetectorFactory(params.Factory, params.Logger, time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate issue: s/params.Factory/params.MetricsFactory/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this separately - it'll only add noise to this PR


msgProcessor.Process(&saramaMessageWrapper{msg})
case <-deadlockDetector.getClosePartition():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getClosePartition/closePartitionChannel/

}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.newPartitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we call new here? If it internally caches the metrics, then s/newPartitionMetrics/newPartitionMetrics

closeCounter metrics.Counter
}

func (c *Consumer) getNamespace(partition int32) metrics.Factory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getNamespace/metricsFactoryForPartition/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: we don't use "get" in Go.

msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()
c.allPartitionDeadlockDetector.incrementMsgCount()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what the purpose of this allPartitionDeadlockDetector is. It looks like it's only used to increment this counter - why do we even need it? We can always sum the time series to get total counter.

var msgConsumed uint64
w := &deadlockDetector{
msgConsumed: &msgConsumed,
ticker: time.NewTicker(s.interval),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you do not need to leak the ticker. Create it inside goroutine as local var with defer close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -42,6 +43,9 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetectorFactory deadlockDetectorFactory
allPartitionDeadlockDetector *deadlockDetector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: allPartitions...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -42,6 +43,9 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetectorFactory deadlockDetectorFactory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner and easier to understand if you had a top-level deadlockDetector, which can create partitionDeadlockDetector as needed (implementation-wide the former may contain the latter for pId=-1). So factory is only used once to create the top-level detector, and the factory does not need to be stored in the Consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that the top level deadlockDetector also has the responsibilities of the Factor? (That being said, I think it might be cleaner design)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, in a way, but there's nothing wrong with that, especially considering that it happens at runtime and many times, whereas the top-level factory will be used only once on startup and not needed afterwards.

Separating top-level from individual detectors will also allow clean separation in some implementation details, e.g. where some features are not used.

And best of all, you'll be able to move a lot of methods into detectors, away from Consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

msgConsumed *uint64
ticker *time.Ticker
logger *zap.Logger
closePartition chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this does not apply to all-partitions detector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not

Signed-off-by: Prithvi Raj <p.r@uber.com>
f := c.namespace(partition)
return partitionMetrics{
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still bothers me. Some metrics factories are not happy if you try to create a metric with the same name twice. So if we re-acquire the same partition, this could cause a panic, e.g. if someone is using expvar-based metrics (unless we implemented protection in the factory, which I had to do for Prometheus).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test to jaeger-lib/metrics which shows that calling Counter mutiple times with the same tags does not panic for expvar, prometheus and tally.

@@ -210,3 +216,29 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {

t.Fail()
}

func TestHandleClosePartition(t *testing.T) {
localFactory := metrics.NewLocalFactory(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/localFactory/metricsFactory/

done chan struct{}
}

func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some pattern you're following from elsewhere in the code? Referring to metrics factory as merely "factory" is very poor naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a result of some laziness and accepting the IDE generated name - I'll update these

cmd/ingester/app/consumer/deadlock_detector.go Outdated Show resolved Hide resolved
cmd/ingester/app/consumer/deadlock_detector.go Outdated Show resolved Hide resolved

func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
panicFunc := func(partition int32) {
factory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assumes that factory is reentrant for the same metric name, which is not guaranteed

cmd/ingester/app/consumer/consumer.go Show resolved Hide resolved
@yurishkuro yurishkuro changed the title Suicide hack for Kafka driver instability Deadlock detector hack for Kafka driver instability Oct 6, 2018
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -30,8 +30,17 @@ type errMetrics struct {
errCounter metrics.Counter
}

type partitionMetrics struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should clean-up these metrics. All 3 structs are per-partition, could we not combine them into one? Then we won't have all these small functions on the Consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - I'll do it as a separate commit

cmd/ingester/app/consumer/deadlock_detector.go Outdated Show resolved Hide resolved
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
@vprithvi vprithvi merged commit 7105fa9 into master Oct 9, 2018
@ghost ghost removed the review label Oct 9, 2018
@vprithvi vprithvi deleted the seppuku branch October 9, 2018 15:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants