-
Notifications
You must be signed in to change notification settings - Fork 87
Description
I am experimenting with CloudEvents and have decided to persist my events to Azure Cosmos. I had a few issues with serialisation but eventually came up with something like this which successfully writes to Cosmos.
public override async Task<CloudEvent> AddAsync(CloudEvent item, CancellationToken cancellationToken = default)
{
var container = CosmosContainerFactory.GetContainer(ContainerName);
try
{
var formatter = new JsonEventFormatter();
var bytes = formatter.EncodeStructuredModeMessage(item, out _);
await using var stream = BinaryData.FromBytes(bytes).ToStream();
var response = await container.CreateItemStreamAsync(
stream,
GetPartitionKey(item),
cancellationToken: cancellationToken
);
response.EnsureSuccessStatusCode();
return item;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
throw new CosmosItemAlreadyExistsException(IdProvider.GetId(item));
}
}
I am using the SystemTextJson JSON formatter here to encode the message. It feels a bit janky but this seemed to be the only way to simply serialise the event without using any of the ToHttpContent stuff (which I don't want/need). Firstly, is there any better way of doing this?
Next, my main issue is that when retrieving an event from my Cosmos table, the deserialisation fails due to the Cosmos system properties containing underscores (which I can see that the CloudEvent spec and validation prohibits). I can see from looking at the JsonEventFormatter code that these properties are being treated as event attributes. I want to be able to ignor ethese properties but there doesn't seem to be any way of doing this because:
CloudEventis a sealed class so I can't do any fancy inheritance and[JsonIgnore]combination.- The event formatter does not expose any way of ignoring properties. It just treats any property which isn't a 'standard' property as a custom event attribute.
FWIW, here is my code for fetching an event from Cosmos:
public override async Task<CloudEvent?> GetByIdAsync(
string id,
PartitionKey partitionKey,
CancellationToken cancellationToken = default)
{
var container = CosmosContainerFactory.GetContainer(ContainerName);
try
{
var response = await container.ReadItemStreamAsync(
id,
partitionKey,
cancellationToken: cancellationToken
);
var formatter = new JsonEventFormatter();
var cloudEvent = await formatter.DecodeStructuredModeMessageAsync(response.Content, new ContentType("application/json"), null);
return cloudEvent;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
return null;
}
}