Skip to content

Latest commit

 

History

History
77 lines (57 loc) · 2.56 KB

File metadata and controls

77 lines (57 loc) · 2.56 KB

Confluent.Kafka.Extensions.OpenTelemetry

GitHub Actions Badge NuGet Badge

The Confluent.Kafka.Extensions.OpenTelemetry package enables collection of instrumentation data of the Confluent.Kafka library. The actual instrumentation of the Confluent.Kafka library should be configured using Confluent.Kafka.Extensions.Diagnostics.

Installation

Install-Package Confluent.Kafka.Extensions.OpenTelemetry

Usage

Confluent.Kafka configuration

As Confluent.Kafka does not expose any instrumentation data, additional, configuration is required. Full documentation is available at Confluent.Kafka.Extensions.Diagnostics docs. There is also an example on how to use the package in real world application.

Producer

using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;


using var producer =
    new ProducerBuilder<Null, string>(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }))
        .SetKeySerializer(Serializers.Null)
        .SetValueSerializer(Serializers.Utf8)
        .BuildWithInstrumentation();

await producer.ProduceAsync("topic", new Message<Null, string> { Value = "Hello World!" });

Consumer

using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;

using var consumer = new ConsumerBuilder<Ignore, string>(
        new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })
        {
            GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
        })
    .SetValueDeserializer(Deserializers.Utf8)
    .Build();

consumer.Subscribe("topic");

consumer.ConsumeWithInstrumentation((result) =>
{
    Console.WriteLine(result.Message.Value);
});

OpenTelemetry configuration

using Confluent.Kafka.Extensions.OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

builder.Services.AddOpenTelemetry().WithTracing(traceBuilder =>
{
    traceBuilder
        .AddInMemoryExporter()
        .AddHttpClientInstrumentation()
        .AddAspNetCoreInstrumentation()
        .AddConfluentKafkaInstrumentation();  // <-- Add Confluent.Kafka OpenTelemetry support
});