Skip to content

Commit

Permalink
DBZ-8467 Emit traces even when no propagated trace context is found
Browse files Browse the repository at this point in the history
In the ActivateTracingSpan SMT, emitting traces when no propagated trace context is found is controlled by the
`tracing.with.context.field.only` setting. However, the if/else condition was flawed, resulting in traces not being
emitted event with this setting being set to `false`. This commit fixes the condition logic.
  • Loading branch information
VJean authored and mfvitale committed Dec 11, 2024
1 parent f8e7aee commit 63c0281
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,16 @@ public R apply(R connectRecord) {
propagatedSpanContext = after.getString(spanContextField);
}

if (propagatedSpanContext == null) {
if (requireContextField) {
return connectRecord;
}
if (propagatedSpanContext == null && requireContextField) {
return connectRecord;
}
else {
try {
return TracingSpanUtil.traceRecord(connectRecord, envelope, source, propagatedSpanContext, operationName);
}
catch (NoClassDefFoundError e) {
throw new DebeziumException("Failed to record tracing information, tracing libraries not available", e);
}

try {
return TracingSpanUtil.traceRecord(connectRecord, envelope, source, propagatedSpanContext, operationName);
}
catch (NoClassDefFoundError e) {
throw new DebeziumException("Failed to record tracing information, tracing libraries not available", e);
}
return connectRecord;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,58 @@ private TracingSpanUtil() {

}

/**
* Create tracing spans representing the write operation in the database (the span timestamp is taken
* from the source event if available), as well as Debezium's processing operation (the span timestamp
* is taken from the envelope's timestamp).
* If a trace context is provided for propagation, it is set as the parent context of the
* created spans to enable distributed tracing.
* The resulting context is injected in the Kafka Connect Record headers for further propagation.
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @param connectRecord the Connect record that is to be enriched with tracing information
* @param envelope the envelope wrapped by the record
* @param source the source field of the envelope, or {@code null}
* @param propagatedSpanContext a String serialization of a {@link java.util.Properties} instance representing
* the parent span context that should be used for trace propagation, or {@code null}
* @param operationName the operation name of the debezium processing span
* @return the connect record with message headers augmented with tracing information
*/
public static <R extends ConnectRecord<R>> R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext, String operationName) {
SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
.setSpanKind(SpanKind.INTERNAL);

if (propagatedSpanContext != null) {

Properties props = PropertiesGetter.extract(propagatedSpanContext);

Context parentSpanContext = openTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), props, PropertiesGetter.INSTANCE);

SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
.setSpanKind(SpanKind.INTERNAL)
.setParent(parentSpanContext);
txLogSpanBuilder.setParent(parentSpanContext);
}

if (source != null) {
Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
if (eventTimestamp != null) {
txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
}
if (source != null) {
Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
if (eventTimestamp != null) {
txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
}
}

Span txLogSpan = txLogSpanBuilder.startSpan();
Span txLogSpan = txLogSpanBuilder.startSpan();

try (Scope ignored = txLogSpan.makeCurrent()) {
if (source != null) {
for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
}
try (Scope ignored = txLogSpan.makeCurrent()) {
if (source != null) {
for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
}
debeziumSpan(envelope, operationName);

TextMapPropagator textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
textMapPropagator.inject(Context.current(), connectRecord.headers(), KafkaConnectHeadersSetter.INSTANCE);
}
finally {
txLogSpan.end();
}
debeziumSpan(envelope, operationName);

TextMapPropagator textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
textMapPropagator.inject(Context.current(), connectRecord.headers(), KafkaConnectHeadersSetter.INSTANCE);
}
finally {
txLogSpan.end();
}

return connectRecord;
Expand Down

0 comments on commit 63c0281

Please sign in to comment.