Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Source/Messaging/IServiceIdentityProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Fossa.Messaging;

using TIKSN.Identity;

/// <summary>
/// Provides service identity information.
/// </summary>
public interface IServiceIdentityProvider
{
/// <summary>
/// Gets the service identity.
/// </summary>
/// <returns>The service identity.</returns>
public ServiceIdentity GetIdentity();
}
27 changes: 24 additions & 3 deletions Source/Messaging/ProducerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ namespace Fossa.Messaging;
/// <remarks>
/// Initializes a new instance of the <see cref="ProducerProvider"/> class.
/// </remarks>
/// <param name="serviceIdentityProvider">The service identity provider.</param>
/// <param name="options">The options.</param>
public class ProducerProvider(IOptions<MessagingOptions> options) : IProducerProvider, IDisposable
public class ProducerProvider(
IServiceIdentityProvider serviceIdentityProvider,
IOptions<MessagingOptions> options) : IProducerProvider, IDisposable
{
private readonly IOptions<MessagingOptions> options = options ?? throw new ArgumentNullException(nameof(options));
private readonly Lock producerLock = new();
private readonly IServiceIdentityProvider serviceIdentityProvider = serviceIdentityProvider ?? throw new ArgumentNullException(nameof(serviceIdentityProvider));

private bool disposedValue;
private IProducer<string?, byte[]>? producer;
private volatile IProducer<string?, byte[]>? producer;

/// <inheritdoc/>
public void Dispose()
Expand All @@ -27,13 +32,24 @@ public void Dispose()
/// <inheritdoc/>
public IProducer<string?, byte[]> GetProducer()
{
ObjectDisposedException.ThrowIf(this.disposedValue, this);

if (this.producer == null)
{
lock (this.producerLock)
{
#pragma warning disable CA1508 // Avoid dead conditional code
this.producer ??= new ProducerBuilder<string?, byte[]>(this.options.Value.Actor).Build();
if (this.producer == null)
#pragma warning restore CA1508 // Avoid dead conditional code
{
var serviceIdentity = this.serviceIdentityProvider.GetIdentity();
var producerConfig = new ProducerConfig(this.options.Value.Actor)
{
ClientId = serviceIdentity.ToString(),
};
var producerBuilder = new ProducerBuilder<string?, byte[]>(producerConfig);
this.producer = producerBuilder.Build();
}
}
}

Expand All @@ -48,6 +64,11 @@ protected virtual void Dispose(bool disposing)
{
if (!this.disposedValue)
{
if (disposing)
{
this.producer?.Dispose();
}

this.disposedValue = true;
}
}
Expand Down
119 changes: 119 additions & 0 deletions Tests/Messaging.Test/ServiceIdentityProviderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
namespace Fossa.Messaging.Test;

using Autofac;
using Autofac.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Time.Testing;
using NSubstitute;
using TIKSN.DependencyInjection;
using TIKSN.Identity;
using Xunit;

public class ServiceIdentityProviderTests
{
private readonly IServiceProvider serviceProvider;

public ServiceIdentityProviderTests()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
{
"Messaging:Actor:bootstrap.servers",
"app.f.aivencloud.com:19761"
},
{
"Messaging:Actor:security.protocol",
"Ssl"
},
{
"Messaging:Actor:sasl.mechanism",
"ScramSha256"
},
{
"Messaging:Actor:ssl.ca.pem",
"test-ca"
},
{
"Messaging:Actor:ssl.certificate.pem",
"test-cert"
},
{
"Messaging:Actor:ssl.key.pem",
"test-key"
},
{
"Messaging:Topic",
"test"
},
})
.Build();
var services = new ServiceCollection();
_ = services.AddMessaging(configuration);
_ = services.AddFrameworkCore();

var fakeTimeProvider = new FakeTimeProvider(
new DateTimeOffset(2022, 9, 24, 0, 0, 0, TimeSpan.Zero));
_ = services.AddSingleton<TimeProvider>(fakeTimeProvider);

var serviceIdentityProvider = Substitute.For<IServiceIdentityProvider>();
_ = serviceIdentityProvider
.GetIdentity().Returns(
new ServiceIdentity(
applicationName: "Fossa",
componentNames: Seq("Messaging", "Test"),
instanceId: ServiceInstanceId.Create(Ulid.NewUlid())));

_ = services.AddSingleton(serviceIdentityProvider);

ContainerBuilder containerBuilder = new();
_ = containerBuilder.RegisterModule<CoreModule>();
containerBuilder.Populate(services);

this.serviceProvider = new AutofacServiceProvider(containerBuilder.Build());
}

[Fact]
public void GivenServiceIdentity_WhenPropertiesAccessed_ThenReturnsExpectedValues()
{
// Arrange
var serviceIdentityProvider = this.serviceProvider.GetRequiredService<IServiceIdentityProvider>();
var identity = serviceIdentityProvider.GetIdentity();

// Act & Assert
Assert.NotEmpty(identity.ServicePath);
Assert.Contains("Messaging.Test:", identity.ToString(), StringComparison.OrdinalIgnoreCase);
Assert.Contains(':', identity.ToString());
}

[Fact]
public void GivenServiceIdentityProvider_WhenGetIdentity_ThenReturnsValidIdentity()
{
// Arrange
var serviceIdentityProvider = this.serviceProvider.GetRequiredService<IServiceIdentityProvider>();

// Act
var identity = serviceIdentityProvider.GetIdentity();

// Assert
Assert.NotNull(identity);
Assert.False(string.IsNullOrEmpty(identity.ApplicationName));
Assert.False(string.IsNullOrEmpty(identity.ServicePath));
Assert.False(string.IsNullOrEmpty(identity.InstanceId.ToString()));
}

[Fact]
public void GivenServiceIdentityProvider_WhenGetIdentityCalledMultipleTimes_ThenReturnsSameInstance()
{
// Arrange
var serviceIdentityProvider = this.serviceProvider.GetRequiredService<IServiceIdentityProvider>();

// Act
var identity1 = serviceIdentityProvider.GetIdentity();
var identity2 = serviceIdentityProvider.GetIdentity();

// Assert
Assert.Same(identity1, identity2);
}
}
Loading