Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a metric for number of partitions held #1154

Merged
merged 2 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add a metric for number of partitions held
ref #1126

Signed-off-by: Prithvi Raj <p.r@uber.com>
  • Loading branch information
vprithvi committed Oct 30, 2018
commit 41be5b8536ebf94062f036dd4f92d2b2aa0b10e1
4 changes: 4 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Consumer struct {
deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionsHeld metrics.Counter
}

type consumerState struct {
Expand All @@ -63,6 +64,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(params.MetricsFactory),
}, nil
}

Expand Down Expand Up @@ -100,6 +102,8 @@ func (c *Consumer) Close() error {

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionsHeld.Inc(1)
defer c.partitionsHeld.Inc(-1)
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)
Expand Down
8 changes: 7 additions & 1 deletion cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/uber/jaeger-lib/metrics"
)

const consumerNamespace = "sarama-consumer"

type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
Expand All @@ -36,7 +38,7 @@ type partitionMetrics struct {
}

func (c *Consumer) namespace(partition int32) metrics.Factory {
return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return c.metricsFactory.Namespace(consumerNamespace, map[string]string{"partition": strconv.Itoa(int(partition))})
}

func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
Expand All @@ -58,3 +60,7 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}

func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
return metricsFactory.Namespace(consumerNamespace, nil).Counter("partitions-held", nil)
}
21 changes: 16 additions & 5 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
newConsumer, err := New(Params{MetricsFactory: metrics.NullFactory})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}
Expand Down Expand Up @@ -83,23 +83,24 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
}

func newConsumer(
factory metrics.Factory,
metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer) *Consumer {

logger, _ := zap.NewDevelopment()
return &Consumer{
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
deadlockDetector: newDeadlockDetector(factory, logger, time.Second),
partitionsHeld: partitionsHeld(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
Expand Down Expand Up @@ -152,12 +153,22 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mc.YieldMessage(msg)
isProcessed.Wait()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 1,
})

mp.AssertExpectations(t)
// Ensure that the partition consumer was updated in the map
assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(),
undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset())
undertest.Close()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 0,
})

partitionTag := map[string]string{"partition": fmt.Sprint(partition)}
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.messages",
Expand Down