-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deadlock detector hack for Kafka driver instability #1087
Conversation
Signed-off-by: Prithvi Raj <p.r@uber.com>
Codecov Report
@@ Coverage Diff @@
## master #1087 +/- ##
========================================
Coverage ? 100%
========================================
Files ? 141
Lines ? 6723
Branches ? 0
========================================
Hits ? 6723
Misses ? 0
Partials ? 0
Continue to review full report at Codecov.
|
cmd/ingester/app/consumer/sepukku.go
Outdated
buf := make([]byte, 1<<20) | ||
logger.Panic("No messages processed in the last check interval", | ||
zap.Int32("partition", partition), | ||
zap.String("stack", string(buf[:runtime.Stack(buf, true)]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in hotrod zap logger automatically prints the stack. What's different here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zap only prints the current go routine - This prints out all go routines.
cmd/ingester/app/consumer/sepukku.go
Outdated
func (s *seppukuFactory) startMonitoringForPartition(partition int32) *seppukuWorker { | ||
var msgConsumed uint64 | ||
w := &seppukuWorker{ | ||
msgConsumed: &msgConsumed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no reason to allocate a pointer, you can still use address of a struct field with atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I had assumed that the pointer allocation had a similar overhead to an field allocation. I didn't want to use &
on every access.
cmd/ingester/app/consumer/sepukku.go
Outdated
case w.closePartition <- struct{}{}: | ||
s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition)) | ||
default: | ||
// If closePartition is blocked, attempt seppuku |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If closePartition is blocked, the consumer may have deadlocked -> kill the process
cmd/ingester/app/consumer/sepukku.go
Outdated
} | ||
|
||
func (w *seppukuWorker) close() { | ||
w.ticker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could inadvertently kill the process after rebalance, but not sure how to avoid it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate how?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, because Ticker.close() does not close the channel, the race condition I was thinking about won't happen on rebalance, but you will leak the goroutine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, I can use a separate channel to close this if you feel strongly about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, let's fix this, a goroutine leak is not good
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -52,12 +56,15 @@ type consumerState struct { | |||
|
|||
// New is a constructor for a Consumer | |||
func New(params Params) (*Consumer, error) { | |||
deadlockDetectorFactory := newDeadlockDetectorFactory(params.Factory, params.Logger, time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate issue: s/params.Factory/params.MetricsFactory/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll address this separately - it'll only add noise to this PR
|
||
msgProcessor.Process(&saramaMessageWrapper{msg}) | ||
case <-deadlockDetector.getClosePartition(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getClosePartition/closePartitionChannel/
} | ||
|
||
func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { | ||
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) | ||
partitionConsumer.Close() // blocks until messages channel is drained | ||
c.newPartitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we call new
here? If it internally caches the metrics, then s/newPartitionMetrics/newPartitionMetrics
closeCounter metrics.Counter | ||
} | ||
|
||
func (c *Consumer) getNamespace(partition int32) metrics.Factory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getNamespace/metricsFactoryForPartition/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: we don't use "get" in Go.
msgMetrics.offsetGauge.Update(msg.Offset) | ||
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) | ||
deadlockDetector.incrementMsgCount() | ||
c.allPartitionDeadlockDetector.incrementMsgCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow what the purpose of this allPartitionDeadlockDetector is. It looks like it's only used to increment this counter - why do we even need it? We can always sum the time series to get total counter.
var msgConsumed uint64 | ||
w := &deadlockDetector{ | ||
msgConsumed: &msgConsumed, | ||
ticker: time.NewTicker(s.interval), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you do not need to leak the ticker. Create it inside goroutine as local var with defer close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -42,6 +43,9 @@ type Consumer struct { | |||
internalConsumer consumer.Consumer | |||
processorFactory ProcessorFactory | |||
|
|||
deadlockDetectorFactory deadlockDetectorFactory | |||
allPartitionDeadlockDetector *deadlockDetector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: allPartitions...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -42,6 +43,9 @@ type Consumer struct { | |||
internalConsumer consumer.Consumer | |||
processorFactory ProcessorFactory | |||
|
|||
deadlockDetectorFactory deadlockDetectorFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner and easier to understand if you had a top-level deadlockDetector
, which can create partitionDeadlockDetector
as needed (implementation-wide the former may contain the latter for pId=-1). So factory is only used once to create the top-level detector, and the factory does not need to be stored in the Consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that the top level deadlockDetector
also has the responsibilities of the Factor? (That being said, I think it might be cleaner design)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does, in a way, but there's nothing wrong with that, especially considering that it happens at runtime and many times, whereas the top-level factory will be used only once on startup and not needed afterwards.
Separating top-level from individual detectors will also allow clean separation in some implementation details, e.g. where some features are not used.
And best of all, you'll be able to move a lot of methods into detectors, away from Consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
msgConsumed *uint64 | ||
ticker *time.Ticker | ||
logger *zap.Logger | ||
closePartition chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this does not apply to all-partitions detector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not
Signed-off-by: Prithvi Raj <p.r@uber.com>
f := c.namespace(partition) | ||
return partitionMetrics{ | ||
closeCounter: f.Counter("partition-close", nil), | ||
startCounter: f.Counter("partition-start", nil)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still bothers me. Some metrics factories are not happy if you try to create a metric with the same name twice. So if we re-acquire the same partition, this could cause a panic, e.g. if someone is using expvar-based metrics (unless we implemented protection in the factory, which I had to do for Prometheus).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test to jaeger-lib/metrics which shows that calling Counter
mutiple times with the same tags does not panic for expvar, prometheus and tally.
@@ -210,3 +216,29 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { | |||
|
|||
t.Fail() | |||
} | |||
|
|||
func TestHandleClosePartition(t *testing.T) { | |||
localFactory := metrics.NewLocalFactory(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/localFactory/metricsFactory/
done chan struct{} | ||
} | ||
|
||
func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there some pattern you're following from elsewhere in the code? Referring to metrics factory as merely "factory" is very poor naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is a result of some laziness and accepting the IDE generated name - I'll update these
|
||
func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { | ||
panicFunc := func(partition int32) { | ||
factory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assumes that factory is reentrant for the same metric name, which is not guaranteed
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -30,8 +30,17 @@ type errMetrics struct { | |||
errCounter metrics.Counter | |||
} | |||
|
|||
type partitionMetrics struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should clean-up these metrics. All 3 structs are per-partition, could we not combine them into one? Then we won't have all these small functions on the Consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree - I'll do it as a separate commit
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj <p.r@uber.com>
Signed-off-by: Prithvi Raj p.r@uber.com
Which problem is this PR solving?
Short description of the changes