Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Source/Messaging/MessagingActorEntryOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Fossa.Messaging;

/// <summary>
/// Represents the options for a messaging actor entry.
/// </summary>
public class MessagingActorEntryOptions
{
/// <summary>
/// Gets or sets the name.
/// </summary>
public string? Name { get; set; }

/// <summary>
/// Gets or sets the plain text value.
/// </summary>
public string? PlainTextValue { get; set; }

/// <summary>
/// Gets or sets the Base64 value.
/// </summary>
public string? Base64Value { get; set; }
}
2 changes: 1 addition & 1 deletion Source/Messaging/MessagingOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class MessagingOptions
/// <summary>
/// Gets the actor.
/// </summary>
public Dictionary<string, string>? Actor { get; init; }
public IReadOnlyList<MessagingActorEntryOptions>? Actor { get; init; }

/// <summary>
/// Gets the topic.
Expand Down
36 changes: 35 additions & 1 deletion Source/Messaging/ProducerProvider.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace Fossa.Messaging;

using System.Text;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using static LanguageExt.Prelude;

/// <summary>
/// Provides an <see cref="IProducer{TKey, TValue}"/>.
Expand Down Expand Up @@ -40,7 +42,12 @@ public void Dispose()
#pragma warning restore CA1508 // Avoid dead conditional code
{
var serviceIdentity = this.serviceIdentityProvider.GetIdentity();
var producerConfig = new ProducerConfig(this.options.Value.Actor)
var messagingActorEntryOptions = this.options.Value.Actor ?? [];
var messagingActorOptions = messagingActorEntryOptions
.ToDictionary(
k => k?.Name ?? throw new InvalidOperationException("One of the Message Actor Entry Options Name is not provided."),
ResolveActorEntryValue);
var producerConfig = new ProducerConfig(messagingActorOptions)
{
ClientId = serviceIdentity.ToString(),
};
Expand Down Expand Up @@ -69,4 +76,31 @@ protected virtual void Dispose(bool disposing)
this.disposedValue = true;
}
}

private static string ResolveActorEntryValue(MessagingActorEntryOptions? options)
{
if (options is null)
{
throw new InvalidOperationException("One of the Message Actor Entry Options is null.");
}

var providedValues = Seq(
Tuple(nameof(options.PlainTextValue), Optional(options.PlainTextValue)),
Tuple(nameof(options.Base64Value), Optional(options.Base64Value)
.Map(x => Encoding.UTF8.GetString(Convert.FromBase64String(x)))))
.Choose(x => x.Item2.Map(v => Tuple(x.Item1, v)));

if (providedValues.Count == 1)
{
return providedValues.Single().Item2;
}
else if (providedValues.Count == 0)
{
throw new InvalidOperationException($"Messaging actor entry '{options.Name}'. One of the value properties must be set.");
}
else
{
throw new InvalidOperationException($"Messaging actor entry '{options.Name}' has multiple value properties set. Only one of these '{providedValues.Select(x => x.Item1)}' properties should be set.");
}
}
}
71 changes: 71 additions & 0 deletions Tests/Messaging.Test/MessagePublisherTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Fossa.Messaging.Test;

using Autofac;
using Autofac.Extensions.DependencyInjection;
using Fossa.Messaging.Messages.Events;
using IdGen.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Time.Testing;
using NSubstitute;
using TIKSN.DependencyInjection;
using TIKSN.Identity;
using Xunit;

[Trait("Category", "Integration")]
public class MessagePublisherTests
{
private readonly IServiceProvider serviceProvider;

public MessagePublisherTests()
{
var configuration = new ConfigurationBuilder()
.AddUserSecrets<MessagePublisherTests>()
.Build();
var services = new ServiceCollection();
_ = services.AddMessaging(configuration, "Fossa", Seq("Messaging", "Test"));
_ = services.AddFrameworkCore();
_ = services.AddIdGen(9);

var fakeTimeProvider = new FakeTimeProvider(
new DateTimeOffset(2022, 9, 24, 0, 0, 0, TimeSpan.Zero));
_ = services.AddSingleton<TimeProvider>(fakeTimeProvider);

var serviceIdentityProvider = Substitute.For<IServiceIdentityProvider>();
_ = serviceIdentityProvider
.GetIdentity().Returns(
new ServiceIdentity(
applicationName: "Fossa",
componentNames: Seq("Messaging", "Test"),
instanceId: ServiceInstanceId.Create(Ulid.NewUlid())));

_ = services.AddSingleton(serviceIdentityProvider);

ContainerBuilder containerBuilder = new();
_ = containerBuilder.RegisterModule<CoreModule>();
containerBuilder.Populate(services);

this.serviceProvider = new AutofacServiceProvider(containerBuilder.Build());
}

[Fact]
public async Task GivenPublisherAndMessage_WhenMessageIsPublished_ThenDeliveryShouldSucceedAsync()
{
// Arrange
var messagePublisher = this.serviceProvider.GetRequiredService<IMessagePublisher>();
const string topic = "test";

var message = new CompanyDeletedProtoEvent { CompanyId = 123L };

// Act

var deliveryResult = await messagePublisher.PublishAsync(message, message.CompanyId, "Company", message.CompanyId, default).ConfigureAwait(true);

// Assert

Assert.NotNull(deliveryResult);
Assert.NotNull(deliveryResult.Key);
Assert.NotNull(deliveryResult.Value);
Assert.Equal(topic, deliveryResult.Topic);
}
}
1 change: 1 addition & 0 deletions Tests/Messaging.Test/Messaging.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<PropertyGroup Label="Build">
<TargetFramework>net10.0</TargetFramework>
<UserSecretsId>ad336f2b-09b8-4c53-96cb-ed9913b583f1</UserSecretsId>
</PropertyGroup>

<ItemGroup Label="Project References">
Expand Down
1 change: 1 addition & 0 deletions build.cake
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Task("Test")
Blame = true,
Collectors = new string[] { "Code Coverage", "XPlat Code Coverage" },
Configuration = configuration,
Filter = !BuildSystem.IsLocalBuild ? "Category!=Integration" : null,
Loggers = new string[]
{
$"trx;LogFileName={project.GetFilenameWithoutExtension()}.trx",
Expand Down
Loading