Skip to content

Commit

Permalink
Remove need of producer registration
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed Aug 7, 2023
1 parent a7ed395 commit 5a2c0cb
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public class Program

#### Producer Provider

With MassTransit v8.1, you can dynamically resolve a producer using `ITopicProducerProvider` (registered as `Scoped`). However, please note that producers' registration is still required.
With MassTransit v8.1, you can dynamically resolve a producer using `ITopicProducerProvider` (registered as `Scoped`).

```csharp
namespace KafkaProducer;
Expand All @@ -259,8 +259,6 @@ public class Program

x.AddRider(rider =>
{
rider.AddProducer<KafkaMessage>("topic-name");

rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ public interface IKafkaHostConfiguration :
KafkaSendTransportContext<TKey, TValue> CreateSendTransportContext<TKey, TValue>(IBusInstance busInstance, string topic)
where TValue : class;

IKafkaProducerSpecification CreateSpecification<TKey, TValue>(string topicName, Action<IKafkaProducerConfigurator<TKey, TValue>> configure)
where TValue : class;

IKafkaProducerSpecification CreateSpecification<TKey, TValue>(string topicName, ProducerConfig producerConfig,
Action<IKafkaProducerConfigurator<TKey, TValue>> configure)
where TValue : class;

IKafkaConsumerSpecification CreateSpecification<TKey, TValue>(string topicName, string groupId,
Action<IKafkaTopicReceiveEndpointConfigurator<TKey, TValue>> configure)
where TValue : class;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.KafkaIntegration
namespace MassTransit
{
using System;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,7 @@ void IKafkaFactoryConfigurator.TopicProducer<TKey, TValue>(string topicName, Pro
if (producerConfig == null)
throw new ArgumentNullException(nameof(producerConfig));

var added = _producers.TryAdd(topicName, topic =>
{
var configurator = new KafkaProducerSpecification<TKey, TValue>(this, producerConfig, topicName, _oAuthBearerTokenRefreshHandler);
configurator.SetHeadersSerializer(_headersSerializer);
configure?.Invoke(configurator);

configurator.ConnectSendObserver(_sendObservers);
if (_configureSend != null)
configurator.ConfigureSend(_configureSend);
return configurator;
});
var added = _producers.TryAdd(topicName, topic => CreateSpecification(topic, producerConfig, configure));

if (!added)
throw new ConfigurationException($"A topic producer with the same key was already added: {topicName}");
Expand Down Expand Up @@ -271,14 +261,35 @@ public KafkaSendTransportContext<TKey, TValue> CreateSendTransportContext<TKey,
where TValue : class
{
if (!_producers.TryGetValue(topic, out var spec))
throw new ConfigurationException($"Producer for topic: {topic} is not configured.");
spec = CreateSpecification<TKey, TValue>(topic);

if (spec is IKafkaProducerSpecification<TKey, TValue> specification)
return specification.CreateSendTransportContext(busInstance);

throw new ConfigurationException($"Producer for topic: {topic} is not configured for ${typeof(Message<TKey, TValue>).Name} message");
}

public IKafkaProducerSpecification CreateSpecification<TKey, TValue>(string topicName,
Action<IKafkaProducerConfigurator<TKey, TValue>> configure = null)
where TValue : class
{
return CreateSpecification(topicName, new ProducerConfig(), configure);
}

public IKafkaProducerSpecification CreateSpecification<TKey, TValue>(string topicName, ProducerConfig producerConfig,
Action<IKafkaProducerConfigurator<TKey, TValue>> configure = null)
where TValue : class
{
var configurator = new KafkaProducerSpecification<TKey, TValue>(this, producerConfig, topicName, _oAuthBearerTokenRefreshHandler);
configurator.SetHeadersSerializer(_headersSerializer);
configure?.Invoke(configurator);

configurator.ConnectSendObserver(_sendObservers);
if (_configureSend != null)
configurator.ConfigureSend(_configureSend);
return configurator;
}

public IKafkaConsumerSpecification CreateSpecification<TKey, TValue>(string topicName, string groupId,
Action<IKafkaTopicReceiveEndpointConfigurator<TKey, TValue>> configure)
where TValue : class
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace MassTransit.KafkaIntegration
namespace MassTransit
{
using System;
using Confluent.Kafka;
using DependencyInjection;
using KafkaIntegration;
using Microsoft.Extensions.DependencyInjection;


public static class TopicProducerProviderExtensions
public static class KafkaTopicProducerProviderExtensions
{
public static ITopicProducer<TValue> GetProducer<TValue>(this ITopicProducerProvider provider, Uri address)
where TValue : class
Expand All @@ -22,7 +23,7 @@ public static ITopicProducer<TValue> GetProducer<TKey, TValue>(this ITopicProduc
return new KeyedTopicProducer<TKey, TValue>(producer, keyResolver);
}

public static ITopicProducerProvider GetScopedTopicProducerProvider(this ITopicProducerProvider producerProvider, IServiceProvider provider)
internal static ITopicProducerProvider GetScopedTopicProducerProvider(this ITopicProducerProvider producerProvider, IServiceProvider provider)
{
var contextProvider = provider.GetService<IScopedConsumeContextProvider>();
return contextProvider is { HasContext: true }
Expand Down
4 changes: 0 additions & 4 deletions tests/MassTransit.KafkaIntegration.Tests/Producer_Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public async Task Should_receive_messages()
r.AddConsumer<TestKafkaMessageConsumer<KafkaMessage>>();

r.AddProducer<KafkaMessage>(Topic, producerConfig);

r.UsingKafka((context, k) =>
{
k.TopicEndpoint<KafkaMessage>(Topic, consumerConfig, c =>
Expand Down Expand Up @@ -110,7 +109,6 @@ public class Producer_Provider_Specs
public async Task Should_receive_messages()
{
var consumerConfig = new ConsumerConfig { GroupId = nameof(Producer_Provider_Specs) };
var producerConfig = new ProducerConfig();

await using var provider = new ServiceCollection()
.ConfigureKafkaTestOptions(options =>
Expand All @@ -125,8 +123,6 @@ public async Task Should_receive_messages()
x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(15));
x.AddRider(r =>
{
r.AddProducer<KafkaMessage>(Topic, producerConfig);

r.UsingKafka((_, k) =>
{
k.TopicEndpoint<KafkaMessage>(Topic, consumerConfig, c =>
Expand Down

0 comments on commit 5a2c0cb

Please sign in to comment.