Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafka] otelcol_kafka_receiver_offset_lag not calculated correctly #36093

Open
dyanneo opened this issue Oct 30, 2024 · 4 comments
Open
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka

Comments

@dyanneo
Copy link

dyanneo commented Oct 30, 2024

Component(s)

receiver/kafka

What happened?

We wanted to collect, graph, and alert on lag for the kafka receiver, but observed unexpected behavior when observing the otelcol_kafka_receiver_offset_lag's last values, compared to the values observed using kafka's consumer-groups utility.

Description

The value of last for the measurement otelcol_kafka_receiver_offset_lag does not appear to be calculated correctly.
Also, for context, we are seeing an issue in the otel collector where it keeps emitting lag metrics for partitions it's no longer consuming.

  • We understand that topic is not present as a tag in this metric, per issue 35336

Steps to Reproduce

  • We are emitting and collecting the otelcol_kafka_receiver_offset_lag metrics in our collector deployments
  • We are using grafana to view this metric
  • Upon querying for this metric's last value, we were surprised to see values that didn't correspond to kafka utility outputs.

Expected Result

Per this screen shot, the value of partition 4's lag as shown using kafka's consumer-groups.sh utility changes over time, and is in the low hundreds or smaller:

Screenshot 2024-10-30 at 10 19 19 AM

  • you can see starting and ending offsets and the calculated lag of 117, 74, 112, and 129

Actual Result

Per this screen shot, the value of partition 4's lag does not match what true lag is, per kafka's tools:

Screenshot 2024-10-30 at 10 31 03 AM

Query in Grafana query builder:

Screenshot 2024-10-30 at 10 21 27 AM

Query as Grafana is running it:

Screenshot 2024-10-30 at 10 22 00 AM

Collector version

otelcol_version: 0.109.0

Environment information

Environment

OS: linux
container

OpenTelemetry Collector configuration

---
receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:PORT
  prometheus:
    config:
      scrape_configs:
        - job_name: otel-collector-metrics
          scrape_interval: 30s
          static_configs:
            - targets: ["127.0.0.1:PORT"]

processors:
  memory_limiter:
    check_interval: 1ms
    limit_percentage: 70
    spike_limit_percentage: 15

  resource:
    attributes:
      - key: application
        from_attribute: service.name
        action: insert

  resource/internal:
    attributes:
      - key: labels.application
        value: ${env:CLOUD_APPLICATION}
        action: upsert
      - key: application
        value: ${env:CLOUD_APPLICATION}
        action: upsert
      
  filter/application:
    error_mode: ignore
    traces:
      span:
        - 'resource.attributes["application"] == nil'
    metrics:
      metric:
        - 'resource.attributes["application"] == nil'

  filter/tracingsampler:
    error_mode: ignore
    traces:
      span:
        - 'attributes["sampler.parent"] == nil'

  filter/summarytype:
    error_mode: ignore
    metrics:
      metric:
        - "type == METRIC_DATA_TYPE_SUMMARY"

  metricstransform/version_correction:
    transforms:
      - match_type: strict
        include: tgt.otel.extension.heartbeat
        action: update
        operations:
          - action: update_label
            label: otel.extension.version
            value_actions:
              - value: "v0.8.0\n"
                new_value: "v0.8.0"
          - action: update_label
            label: otel.scope.version
            value_actions:
              - value: "v0.8.0\n"
                new_value: "v0.8.0"

  batch/metrics:
    timeout: 5s
    send_batch_size: 100
    send_batch_max_size: 101

  batch/traces:
    timeout: 5s
    send_batch_size: 33
    send_batch_max_size: 34

exporters:
  tgtkafkaexporter:
    brokers:
      - BROKER_REDACTED
    topic: TOPIC_REDACTED
    protocol_version: 2.7.0
    encoding_extension: tgtinflux_encoding
    producer:
      compression: snappy
      max_message_bytes: 2000000

  kafka:
    brokers:
      - BROKER_REDACTED
    topic: TOPIC_REDACTED
    protocol_version: 2.7.0
    producer:
      compression: snappy
      max_message_bytes: 2000000

service:
  pipelines:
    metrics:
      receivers:
        - otlp
      processors:
        - memory_limiter
        - resource
        - filter/application
        - filter/summarytype
        - metricstransform/version_correction
        - batch/metrics
      exporters:
        - tgtkafkaexporter

    metrics/internal:
      receivers:
        - prometheus
      processors:
        - resource
        - resource/internal
        - filter/application
        - filter/summarytype
        - metricstransform/version_correction
        - batch/metrics
      exporters:
        - tgtkafkaexporter

    traces:
      receivers:
        - otlp
      processors:
        - memory_limiter
        - resource
        - filter/application
        - filter/tracingsampler
        - batch/traces
      exporters:
        - kafka

  telemetry:
    logs:
      level: "info"
    metrics:
      address: 0.0.0.0:PORT

Log output

***WIP***

Additional context

We think the reason for the incorrect data is the fact that the gauge exists within OTEL's registry even after a rebalance and the metric is not receiving updates

Where this gauge is defined:

builder.KafkaReceiverOffsetLag, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge(
"otelcol_kafka_receiver_offset_lag",
metric.WithDescription("Current offset lag"),
metric.WithUnit("1"),
)

One of the places where this gauge is updated:

c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs))

Where this could be addressed:

func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name())))
return nil

@dyanneo dyanneo added bug Something isn't working needs triage New item requiring triage labels Oct 30, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@dyanneo dyanneo changed the title [receiver/kafka] [receiver/kafka] otelcol_kafka_receiver_offset_lag not calculated correctly Oct 30, 2024
@StephanSalas
Copy link

StephanSalas commented Oct 31, 2024

Hi @dyanneo,

Based on my initial analysis:

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
Cleanup(ConsumerGroupSession) error

We have to be potentially careful here with using Cleanup func to solve this issue. By the time Cleanup is called, the session's claims may no longer reflect the partitions that were assigned, or session.Claims() might be empty. We may need to check that assumption as well...

That said... I agree with your approach as long as we are guaranteed on Cleanup(), that the session is accurately populated, we can do something like this:

for topic, partitions := range session.claims() {
    for _, partition := range partitions {
        c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(
            attribute.String(attrInstanceName, c.id.Name()),
            attribute.String(attrTopic, topic),
            attribute.String(attrPartition, strconv.Itoa(int(partition))),
        ))
        // add cleanup for _offset_lag_ metric here as well.
    }
}

@dyanneo
Copy link
Author

dyanneo commented Oct 31, 2024

@StephanSalas Thanks for your inputs on this issue. I appreciate your concerns and the suggestion makes sense to me.

@StephanSalas
Copy link

StephanSalas commented Nov 1, 2024

According to the docs, this is the lifecycle of the serama kafka consumer framework:

// The life-cycle of a session is represented by the following steps:
//
// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
//    and is assigned their "fair share" of partitions, aka 'claims'.
// 2. Before processing starts, the handler's Setup() hook is called to notify the user
//    of the claims and allow any necessary preparation or alteration of state.
// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
//    from concurrent reads/writes.
// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
//    parent context is canceled or when a server-side rebalance cycle is initiated.
// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
//    to allow the user to perform any final tasks before a rebalance.
// 6. Finally, marked offsets are committed one last time before claims are released.

https://github.com/IBM/sarama/blob/main/consumer_group.go#L23C1-L36C87

Thus it looks likely that we can do it in Cleanup(), except for one noteable edge case:

// Please note, that once a rebalance is triggered, sessions must be completed within
// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
// commit failures.
// This method should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims.

TODO: We may need to somehow figure out this edge case... will look into it. Initial idea is to use the Errors() func channel listed here: https://github.com/IBM/sarama/blob/main/consumer_group.go#L87C2-L87C28

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka
Projects
None yet
Development

No branches or pull requests

2 participants