Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reciever/kafkametricsreciever] Expand broker metrics #14167

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/expand-kafka-broker-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkametricsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "expanding the broker metrics that are scraped."

# One or more tracking issues related to the change
issues: [14166]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
119 changes: 118 additions & 1 deletion receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

type saramaMetrics map[string]map[string]interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is sarama a meaningful name here?


type brokerScraper struct {
client sarama.Client
settings receiver.CreateSettings
Expand All @@ -53,6 +55,95 @@ func (s *brokerScraper) shutdown(context.Context) error {
return nil
}

func (s *brokerScraper) scrapeConsumerFetch(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("consumer-fetch-rate-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersConsumerFetchRateDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeIncomingByteRate(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("incoming-byte-rate-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersIncomingByteRateDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeOutgoingByteRate(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("outgoing-byte-rate-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersOutgoingByteRateDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeRequestRate(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("request-rate-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersRequestRateDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeResponseRate(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("response-rate-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersResponseRateDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeResponseSize(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("response-size-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersResponseSizeDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeRequestSize(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("request-size-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersRequestSizeDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeRequestsInFlight(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("requests-in-flight-for-broker-", brokerID)

if metric, ok := allMetrics[key]; ok {
if v, ok := metric["count"].(int64); ok {
s.mb.RecordKafkaBrokersRequestsInFlightDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrapeRequestLatency(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64) {
key := fmt.Sprint("request-latency-in-ms-for-broker-", brokerID)
if metric, ok := allMetrics[key]; ok {
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersRequestLatencyDataPoint(now, v, brokerID)
}
}
}

func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
Expand All @@ -64,7 +155,33 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {

brokers := s.client.Brokers()

s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))
allMetrics := make(map[string]map[string]interface{})

if s.saramaConfig != nil {
allMetrics = s.saramaConfig.MetricRegistry.GetAll()
}

now := pcommon.NewTimestampFromTime(time.Now())

for _, broker := range brokers {
brokerID := int64(broker.ID())
s.scrapeConsumerFetch(now, allMetrics, brokerID)
s.scrapeIncomingByteRate(now, allMetrics, brokerID)
s.scrapeOutgoingByteRate(now, allMetrics, brokerID)
s.scrapeRequestLatency(now, allMetrics, brokerID)
s.scrapeRequestRate(now, allMetrics, brokerID)
s.scrapeRequestSize(now, allMetrics, brokerID)
s.scrapeRequestsInFlight(now, allMetrics, brokerID)
s.scrapeResponseRate(now, allMetrics, brokerID)
s.scrapeResponseSize(now, allMetrics, brokerID)
}

brokerCount := int64(len(brokers))
// kafka.brokers is deprecated. This should be removed in a future release.
s.mb.RecordKafkaBrokersDataPoint(now, brokerCount)

// kafka.brokers.count should replace kafka.brokers.
s.mb.RecordKafkaBrokersCountDataPoint(now, brokerCount)

return s.mb.Emit(), nil
}
Expand Down
134 changes: 134 additions & 0 deletions receiver/kafkametricsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,146 @@ metrics:

### kafka.brokers

[DEPRECATED] Number of brokers in the cluster.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {brokers} | Gauge | Int |

### kafka.brokers.consumer_fetch_rate

Average consumer fetch Rate

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {fetches}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.count

Number of brokers in the cluster.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {brokers} | Gauge | Int |

### kafka.brokers.incoming_byte_rate

Average tncoming Byte Rate in bytes/second

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.outgoing_byte_rate

Average outgoing Byte Rate in bytes/second.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.request_latency

Request latency Average in ms

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| ms | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.request_rate

Average request rate per second.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.request_size

Average request size in bytes

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.requests_in_flight

Requests in flight

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests} | Gauge | Int |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.response_rate

Average response rate per second

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {response}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.brokers.response_size

Average response size in bytes

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### kafka.consumer_group.lag

Current approximate lag of consumer group at partition of topic
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkametricsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func createMetricsReceiver(
cfg component.Config,
nextConsumer consumer.Metrics) (receiver.Metrics, error) {
c := cfg.(*Config)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

r, err := newMetricsReceiver(ctx, *c, params, nextConsumer)
if err != nil {
return nil, err
Expand Down
Loading