Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Source/Messaging/IMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ public interface IMessagePublisher
/// Publishes a message to the specified topic.
/// </summary>
/// <param name="message">The message to publish.</param>
/// <param name="companyId">The company identifier.</param>
/// <param name="entityName">The entity name.</param>
/// <param name="entityId">The entity identifier.</param>
/// <param name="cancellationToken">The Cancellation Token.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the asynchronous operation.</returns>
public Task<DeliveryResult<string?, byte[]>> PublishAsync(IMessage message, CancellationToken cancellationToken);
public Task<DeliveryResult<string?, byte[]>> PublishAsync(
IMessage message,
long companyId,
string entityName,
long entityId,
CancellationToken cancellationToken);
}
14 changes: 14 additions & 0 deletions Source/Messaging/MessageMap.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Fossa.Messaging;

using Fossa.Messaging.Messages.Events;
using Google.Protobuf;

/// <summary>
/// Provides a mapping between message types and their corresponding integer identifiers for message serialization and
Expand Down Expand Up @@ -45,6 +46,19 @@ public int GetMessageTypeID(Type messageType) =>
this.messageTypeBiMap.Find(messageType)
.IfNone(() => throw new KeyNotFoundException($"Message type ID not found for type {messageType.FullName}"));

/// <summary>
/// Retrieves the unique identifier associated with the specified message.
/// </summary>
/// <param name="message">The message for which to obtain the identifier.</param>
/// <returns>The identifier corresponding to the specified message type.</returns>
/// <exception cref="KeyNotFoundException">Thrown if the specified message type does not have an associated identifier.</exception>
public int GetMessageTypeID(IMessage message)
{
ArgumentNullException.ThrowIfNull(message);

return this.GetMessageTypeID(message.GetType());
}

private static void RegisterMessageType<TKey>(int value, ref BiMap<ComparableType, int> messageTypeBiMap) =>
messageTypeBiMap = messageTypeBiMap.Add(typeof(TKey), value);

Expand Down
16 changes: 9 additions & 7 deletions Source/Messaging/MessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ namespace Fossa.Messaging;
/// <summary>
/// Publishes messages to a message broker.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="MessagePublisher"/> class.
/// </remarks>
/// <param name="serviceIdentityProvider">The service identity provider.</param>
/// <param name="producerProvider">The producer provider.</param>
/// <param name="messageMap">The Message Map.</param>
Expand All @@ -37,7 +34,12 @@ public class MessagePublisher(
private readonly TimeProvider timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));

/// <inheritdoc/>
public async Task<DeliveryResult<string?, byte[]>> PublishAsync(IMessage message, CancellationToken cancellationToken)
public async Task<DeliveryResult<string?, byte[]>> PublishAsync(
IMessage message,
long companyId,
string entityName,
long entityId,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(message);

Expand All @@ -49,12 +51,12 @@ public class MessagePublisher(
Id = Ulid.NewUlid().ToString(),
DataSchema = null,
Source = new Uri($"/fossa/agent/{this.serviceIdentityProvider.GetIdentity()}", UriKind.Relative),
Subject = "Entity/123",
Subject = $"{entityName}/{entityId}",
Time = this.timeProvider.GetUtcNow(),
Type = this.messageMap.GetMessageTypeID(message.GetType()).ToString(CultureInfo.InvariantCulture),
Type = this.messageMap.GetMessageTypeID(message).ToString(CultureInfo.InvariantCulture),
DataContentType = "application/cloudevents+protobuf",
}
.SetPartitionKey("Company/123");
.SetPartitionKey($"Company/{companyId}");

var kafkaMessage = cloudEvent.ToKafkaMessage(ContentMode.Structured, this.cloudEventFormatter);

Expand Down
3 changes: 0 additions & 3 deletions Source/Messaging/ProducerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ namespace Fossa.Messaging;
/// <summary>
/// Provides an <see cref="IProducer{TKey, TValue}"/>.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="ProducerProvider"/> class.
/// </remarks>
/// <param name="serviceIdentityProvider">The service identity provider.</param>
/// <param name="options">The options.</param>
public class ProducerProvider(
Expand Down
Loading