Skip to content

Commit

Permalink
Use names specified by OpenTelemetry for tags (#276)
Browse files Browse the repository at this point in the history
See eclipse-vertx/vertx-tracing#69

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont authored Sep 27, 2024
1 parent a453f68 commit f82fd40
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
public class ConsumerTracer<S> {
private final VertxTracer<S, Void> tracer;
private final String address;
private final String hostname;
private final String port;
private final TracingPolicy policy;

Expand Down Expand Up @@ -71,7 +70,6 @@ public static <S> ConsumerTracer create(VertxTracer tracer, KafkaClientOptions o
private ConsumerTracer(VertxTracer<S, Void> tracer, TracingPolicy policy, String bootstrapServer) {
this.tracer = tracer;
this.address = bootstrapServer;
this.hostname = Utils.getHost(bootstrapServer);
Integer port = Utils.getPort(bootstrapServer);
this.port = port == null ? null : port.toString();
this.policy = policy;
Expand All @@ -86,7 +84,7 @@ private static Iterable<Map.Entry<String, String>> convertHeaders(Headers header
}

public StartedSpan prepareMessageReceived(Context context, ConsumerRecord rec) {
TraceContext tc = new TraceContext("consumer", address, hostname, port, rec.topic());
TraceContext tc = new TraceContext("consumer", address, port, rec.topic());
S span = tracer.receiveRequest(context, SpanKind.MESSAGING, policy, tc, "kafka_receive", convertHeaders(rec.headers()), TraceTags.TAG_EXTRACTOR);
return new StartedSpan(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
public class ProducerTracer<S> {
private final VertxTracer<Void, S> tracer;
private final String address;
private final String hostname;
private final String port;
private final TracingPolicy policy;

Expand Down Expand Up @@ -65,14 +64,13 @@ public static <S> ProducerTracer create(VertxTracer tracer, KafkaClientOptions o
private ProducerTracer(VertxTracer<Void, S> tracer, TracingPolicy policy, String bootstrapServer) {
this.tracer = tracer;
this.address = bootstrapServer;
this.hostname = Utils.getHost(bootstrapServer);
Integer port = Utils.getPort(bootstrapServer);
this.port = port == null ? null : port.toString();
this.policy = policy;
}

public StartedSpan prepareSendMessage(Context context, ProducerRecord record) {
TraceContext tc = new TraceContext("producer", address, hostname, port, record.topic());
TraceContext tc = new TraceContext("producer", address, port, record.topic());
S span = tracer.sendRequest(context, SpanKind.MESSAGING, policy, tc, "kafka_send", (k, v) -> record.headers().add(k, v.getBytes()), TraceTags.TAG_EXTRACTOR);
return new StartedSpan(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
class TraceContext {
final String kind;
final String address;
final String hostname;
final String port;
final String topic;

TraceContext(String kind, String address, String hostname, String port, String topic) {
TraceContext(String kind, String address, String port, String topic) {
this.kind = kind;
this.address = address;
this.hostname = hostname;
this.port = port;
this.topic = topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@
* Tags for Kafka Tracing
*/
public enum TraceTags {
// See https://github.com/opentracing/specification/blob/master/semantic_conventions.md
PEER_ADDRESS("peer.address", q -> q.address),
PEER_HOSTNAME("peer.hostname", q -> q.hostname),
PEER_PORT("peer.port", q -> q.port),
// See https://opentelemetry.io/docs/specs/semconv/messaging/kafka/
SERVER_ADDRESS("server.address", q -> q.address),
SERVER_PORT("server.port", q -> q.port),
PEER_SERVICE("peer.service", q -> "kafka"),
BUS_DESTINATION("message_bus.destination", q -> q.topic);
BUS_DESTINATION("messaging.destination.name", q -> q.topic);

static final TagExtractor<TraceContext> TAG_EXTRACTOR = new TagExtractor<TraceContext>() {
static final TagExtractor<TraceContext> TAG_EXTRACTOR = new TagExtractor<>() {
private final TraceTags[] TAGS = TraceTags.values();

@Override
public int len(TraceContext obj) {
return TAGS.length;
}

@Override
public String name(TraceContext obj, int index) {
return TAGS[index].name;
}

@Override
public String value(TraceContext obj, int index) {
return TAGS[index].fn.apply(obj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -254,10 +252,9 @@ public <R> String receiveRequest(Context context, SpanKind kind, TracingPolicy p
context.putLocal(CONTEXT_CONSUMER_SPAN, "span-" + receivedCount.intValue());
Map<String, String> tags = tagExtractor.extract(request);
ctx.assertEquals("kafka_receive", operation);
ctx.assertEquals(peerAddress, tags.get("peer.address"));
ctx.assertEquals(host, tags.get("peer.hostname"));
ctx.assertEquals(port, tags.get("peer.port"));
ctx.assertEquals(topic, tags.get("message_bus.destination"));
ctx.assertEquals(peerAddress, tags.get("server.address"));
ctx.assertEquals(port, tags.get("server.port"));
ctx.assertEquals(topic, tags.get("messaging.destination.name"));
ctx.assertEquals("kafka", tags.get("peer.service"));
done.countDown();
return "SPAN-CONSUMER";
Expand All @@ -279,10 +276,9 @@ public <R> String sendRequest(Context context, SpanKind kind, TracingPolicy poli
ctx.assertEquals(SpanKind.MESSAGING, kind);
Map<String, String> tags = tagExtractor.extract(request);
ctx.assertEquals("kafka_send", operation);
ctx.assertEquals(peerAddress, tags.get("peer.address"));
ctx.assertEquals(host, tags.get("peer.hostname"));
ctx.assertEquals(port, tags.get("peer.port"));
ctx.assertEquals(topic, tags.get("message_bus.destination"));
ctx.assertEquals(peerAddress, tags.get("server.address"));
ctx.assertEquals(port, tags.get("server.port"));
ctx.assertEquals(topic, tags.get("messaging.destination.name"));
ctx.assertEquals("kafka", tags.get("peer.service"));
done.countDown();
return "SPAN-PRODUCER";
Expand Down

0 comments on commit f82fd40

Please sign in to comment.