Skip to content

Use raw kafka tracing for spring-kafka based interactionsΒ #1670

@codefromthecrypt

Description

@codefromthecrypt

Is your feature request related to a problem? Please describe.
We get routinely support and issues around Kafka naming conventions. This is compounded by the fact that in some places kafka-clients instrumentation is used and in other cases spring-messaging is with some hacks (ex TracingChannelInterceptor). That things work in different abstractions prevents certain data from being usable in tagging and naming policy.

For example, SleuthKafkaAspect is tested to use kafka-clients instrumentation when a spring-kafka ProducerFactory is autowired.

However, kafka-clients instrumentation is not used when StreamBridge produces a message. Rather, a somewhat fragile spring-messaging variant which uses headers to try to identify if it is kafka or not. As far as I know, SCS can't because of how it initializes: the DefaultKafkaProducerFactory.createRawProducer should be wrapped by kafka-clients instrumentation. However the call trace from BindingService.bindProducer cannot be intercepted (not spring managed internally). Moreover as the kafka producer is literally new'ed up, there's no clean way I'm aware of to fix things.

Describe the solution you'd like
I'd like to have spring-messaging opt out whenever kafka-clients (or amqp-client for that matter!) is in use. Part of that would be filling the gaps that are currently preventing this. For example, I'm not aware a way besides using an agent to overcome this in DefaultKafkaProducerFactory

	protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
		return new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
	}

At the end of the day, if any of the several apis, spring-cloud-stream, spring-messaging, spring-integration, spring-kafka end up calling kafka-clients.. tracing would apply directly over the Kafka implementation of Producer, not the various wrappers above it. In other words have the same effect as this either via a new type or some aspect magic:

	protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
		Producer<K, V> result = new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
                return kafkaTracing.producer(result); // << one wrap to rule them all!
	}

This might require a change here and possibly also spring-kafka in order to control createRawProducer regardless of the call site. It may also require some registry to know which channels are "claimed" so that spring-messaging instrumentation overhead doesn't take place.

Describe alternatives you've considered
I thought about digging the hole deeper in spring-messaging instrumentation, to try to do more guessing if it might be kafka, but I indeed think that's digging a hole. Another idea is to make a clone of kafka-clients instrumentation that works at some of the spring abstractions, but that's labor and would also not see the actual properties needed such as the real topic vs a virtual routing one.

I started looking at AOP but due to the protected method, plus the jigging of this stack trace, I was unable to figure out how to get things to wrap with sleuth.

Additional context

Ex. here's a stack trace leading to this autowired thing with spring-kafka

  @Autowired StreamBridge streamBridge;
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:645)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:520)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:497)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:458)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:452)
	at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$createProducerMessageHandler$0(KafkaMessageChannelBinder.java:356)
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$3(KafkaTopicProvisioner.java:479)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:474)
	at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:354)
	at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:150)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:225)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
	at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
	at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:313)
	at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:282)
	at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
	at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
	at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
	at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
	at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
	at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at sleuth.webmvc.Frontend.main(Frontend.java:76)

Also, while I'm focusing on producer side, I also think that the logic consolidation should also happen on the consumer side.

cc @garyrussell for ideas

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions