-
Notifications
You must be signed in to change notification settings - Fork 783
Description
Is your feature request related to a problem? Please describe.
We get routinely support and issues around Kafka naming conventions. This is compounded by the fact that in some places kafka-clients instrumentation is used and in other cases spring-messaging is with some hacks (ex TracingChannelInterceptor). That things work in different abstractions prevents certain data from being usable in tagging and naming policy.
For example, SleuthKafkaAspect
is tested to use kafka-clients instrumentation when a spring-kafka ProducerFactory
is autowired.
However, kafka-clients instrumentation is not used when StreamBridge
produces a message. Rather, a somewhat fragile spring-messaging variant which uses headers to try to identify if it is kafka or not. As far as I know, SCS can't because of how it initializes: the DefaultKafkaProducerFactory.createRawProducer
should be wrapped by kafka-clients instrumentation. However the call trace from BindingService.bindProducer
cannot be intercepted (not spring managed internally). Moreover as the kafka producer is literally new'ed up, there's no clean way I'm aware of to fix things.
Describe the solution you'd like
I'd like to have spring-messaging opt out whenever kafka-clients (or amqp-client for that matter!) is in use. Part of that would be filling the gaps that are currently preventing this. For example, I'm not aware a way besides using an agent to overcome this in DefaultKafkaProducerFactory
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
return new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
}
At the end of the day, if any of the several apis, spring-cloud-stream, spring-messaging, spring-integration, spring-kafka end up calling kafka-clients.. tracing would apply directly over the Kafka implementation of Producer, not the various wrappers above it. In other words have the same effect as this either via a new type or some aspect magic:
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
Producer<K, V> result = new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
return kafkaTracing.producer(result); // << one wrap to rule them all!
}
This might require a change here and possibly also spring-kafka in order to control createRawProducer regardless of the call site. It may also require some registry to know which channels are "claimed" so that spring-messaging instrumentation overhead doesn't take place.
Describe alternatives you've considered
I thought about digging the hole deeper in spring-messaging instrumentation, to try to do more guessing if it might be kafka, but I indeed think that's digging a hole. Another idea is to make a clone of kafka-clients instrumentation that works at some of the spring abstractions, but that's labor and would also not see the actual properties needed such as the real topic vs a virtual routing one.
I started looking at AOP but due to the protected method, plus the jigging of this stack trace, I was unable to figure out how to get things to wrap with sleuth.
Additional context
Ex. here's a stack trace leading to this autowired thing with spring-kafka
@Autowired StreamBridge streamBridge;
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:645)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:520)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:497)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:458)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:452)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$createProducerMessageHandler$0(KafkaMessageChannelBinder.java:356)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$3(KafkaTopicProvisioner.java:479)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:474)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:354)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:150)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:225)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:313)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:282)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at sleuth.webmvc.Frontend.main(Frontend.java:76)
Also, while I'm focusing on producer side, I also think that the logic consolidation should also happen on the consumer side.
cc @garyrussell for ideas