Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,12 @@ public static class KafkaInstrumentation
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs#L97-L110' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_kafkainstrumentation_middleware' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Connecting to Multiple Brokers <Badge type="tip" text="4.7" />

Wolverine supports interacting with multiple Kafka brokers within one application like this:

snippet: sample_using_multiple_kafka_brokers

Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply
for the broker. So in the example above, you might see `Uri` values for `emea://colors` or `americas://red`.
4 changes: 4 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/Compliance/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ public void Handle(ColorChosen message, ColorHistory history, Envelope envelope)
{
history.Name = message.Name;
history.Envelope = envelope;

Received++;
}

public static int Received { get; set; }
}

public class ColorHistory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ public static async Task configure()

#endregion
}

public static async Task use_named_brokers()
{
#region sample_using_multiple_kafka_brokers

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092");
opts.AddNamedKafkaBroker(new BrokerName("americas"), "americas-kafka:9092");
opts.AddNamedKafkaBroker(new BrokerName("emea"), "emea-kafka:9092");

// Just publish all messages to Kafka topics
// based on the message type (or message attributes)
// This will get fancier in the near future
opts.PublishAllMessages().ToKafkaTopicsOnNamedBroker(new BrokerName("americas"));

// Or explicitly make subscription rules
opts.PublishMessage<ColorMessage>()
.ToKafkaTopicOnNamedBroker(new BrokerName("emea"), "colors");

// Listen to topics
opts.ListenToKafkaTopicOnNamedBroker(new BrokerName("americas"), "red");
// Other configuration
}).StartAsync();

#endregion
}
}

#region sample_KafkaInstrumentation_middleware
Expand All @@ -109,4 +137,5 @@ public static void Before(Envelope envelope, ILogger logger)
}
}

#endregion
#endregion

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.Kafka.Tests;

public class end_to_end_with_named_broker
{
private readonly ITestOutputHelper _output;
private readonly BrokerName theName = new BrokerName("other");

public end_to_end_with_named_broker(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task send_message_to_and_receive_through_kafka_with_inline_receivers()
{
var topicName = Guid.NewGuid().ToString();
using var publisher = WolverineHost.For(opts =>
{
opts.AddNamedKafkaBroker(theName, "localhost:9092").AutoProvision().AutoPurgeOnStartup();

opts.PublishAllMessages()
.ToKafkaTopicOnNamedBroker(theName, topicName)
.SendInline();
});


using var receiver = WolverineHost.For(opts =>
{
opts.AddNamedKafkaBroker(theName, "localhost:9092").AutoProvision();

opts.ListenToKafkaTopicOnNamedBroker(theName, topicName).ProcessInline().Named(topicName);
opts.Services.AddSingleton<ColorHistory>();

});

ColorHandler.Received = new();

Task.Run(async () =>
{
for (int i = 0; i < 10000; i++)
{
await publisher.SendAsync(new ColorChosen { Name = "blue" });
}
});



await ColorHandler.Received.Task.TimeoutAfterAsync(10000);
}

}

public record RequestId(Guid Id);
public record ResponseId(Guid Id);

public static class RequestIdHandler
{
public static ResponseId Handle(RequestId message) => new ResponseId(message.Id);
}

public class ColorHandler
{
public void Handle(ColorChosen message, ColorHistory history, Envelope envelope)
{
history.Name = message.Name;
history.Envelope = envelope;

Received.TrySetResult(true);
}

public static TaskCompletionSource<bool> Received { get; set; } = new();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public class KafkaTransport : BrokerTransport<KafkaTopic>
public AdminClientConfig AdminClientConfig { get; } = new();
public Action<AdminClientBuilder> ConfigureAdminClientBuilders { get; internal set; } = _ => {};

public KafkaTransport() : base("kafka", "Kafka Topics")
public KafkaTransport() : this("kafka")
{

}

public KafkaTransport(string protocol) : base(protocol, "Kafka Topics")
{
Topics = new Cache<string, KafkaTopic>(topicName => new KafkaTopic(this, topicName, EndpointRole.Application));
}
Expand Down
82 changes: 79 additions & 3 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ public static class KafkaTransportExtensions
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints)
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints, BrokerName? name = null)
{
var transports = endpoints.As<WolverineOptions>().Transports;

return transports.GetOrCreate<KafkaTransport>();
return transports.GetOrCreate<KafkaTransport>(name);
}

/// <summary>
Expand All @@ -38,6 +38,25 @@ public static KafkaTransportExpression UseKafka(this WolverineOptions options, s

return new KafkaTransportExpression(transport, options);
}

/// <summary>
/// Configure connection and authentication information for a secondary Kafka broker
/// to this application. Only use this overload if your Wolverine application needs to talk
/// to two or more Kafka brokers
/// </summary>
/// <param name="options"></param>
/// <param name="name">Name of the additional Rabbit Mq broker</param>
/// <param name="configure"></param>
public static KafkaTransportExpression AddNamedKafkaBroker(this WolverineOptions options, BrokerName name,
string bootstrapServers)
{
var transport = options.KafkaTransport(name);
transport.ConsumerConfig.BootstrapServers = bootstrapServers;
transport.ProducerConfig.BootstrapServers = bootstrapServers;
transport.AdminClientConfig.BootstrapServers = bootstrapServers;

return new KafkaTransportExpression(transport, options);
}

/// <summary>
/// Make additive configuration to the Kafka integration for this application
Expand All @@ -56,7 +75,7 @@ public static KafkaTransportExpression ConfigureKafka(this WolverineOptions opti
/// Listen for incoming messages at the designated Kafka topic name
/// </summary>
/// <param name="endpoints"></param>
/// <param name="topicName">The name of the Rabbit MQ queue</param>
/// <param name="topicName">The name of the Kafka topic</param>
/// <param name="configure">
/// Optional configuration for this Rabbit Mq queue if being initialized by Wolverine
/// <returns></returns>
Expand All @@ -70,6 +89,24 @@ public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOption

return new KafkaListenerConfiguration(endpoint);
}

/// <summary>
/// Listen for incoming messages at the designated Kafka topic name
/// </summary>
/// <param name="endpoints"></param>
/// <param name="name">The name of the ancillary Kafka broker</param>
/// <param name="topicName">The name of the Rabbit MQ queue</param>
/// <returns></returns>
public static KafkaListenerConfiguration ListenToKafkaTopicOnNamedBroker(this WolverineOptions endpoints, BrokerName name, string topicName)
{
var transport = endpoints.KafkaTransport(name);

var endpoint = transport.Topics[topicName];
endpoint.EndpointName = topicName;
endpoint.IsListener = true;

return new KafkaListenerConfiguration(endpoint);
}

/// <summary>
/// Publish messages to an Kafka topic
Expand All @@ -89,6 +126,25 @@ public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpressio

return new KafkaSubscriberConfiguration(topic);
}

/// <summary>
/// Publish messages to an Kafka topic
/// </summary>
/// <param name="publishing"></param>
/// <param name="topicName"></param>
/// <returns></returns>
public static KafkaSubscriberConfiguration ToKafkaTopicOnNamedBroker(this IPublishToExpression publishing, BrokerName name, string topicName)
{
var transports = publishing.As<PublishingExpression>().Parent.Transports;
var transport = transports.GetOrCreate<KafkaTransport>(name);

var topic = transport.Topics[topicName];

// This is necessary unfortunately to hook up the subscription rules
publishing.To(topic.Uri);

return new KafkaSubscriberConfiguration(topic);
}

/// <summary>
/// Publish messages to Kafka topics based on Wolverine's rules for deriving topic
Expand All @@ -109,6 +165,26 @@ public static KafkaSubscriberConfiguration ToKafkaTopics(this IPublishToExpressi

return new KafkaSubscriberConfiguration(topic);
}

/// <summary>
/// Publish messages to Kafka topics based on Wolverine's rules for deriving topic
/// names from a message type
/// </summary>
/// <param name="publishing"></param>
/// <param name="topicName"></param>
/// <returns></returns>
public static KafkaSubscriberConfiguration ToKafkaTopicsOnNamedBroker(this IPublishToExpression publishing, BrokerName name)
{
var transports = publishing.As<PublishingExpression>().Parent.Transports;
var transport = transports.GetOrCreate<KafkaTransport>(name);

var topic = transport.Topics[KafkaTopic.WolverineTopicsName];

// This is necessary unfortunately to hook up the subscription rules
publishing.To(topic.Uri);

return new KafkaSubscriberConfiguration(topic);
}

internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message<string, byte[]> message)
{
Expand Down
Loading