Skip to content

Implement underscore prefixes for AMQP #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions src/CloudNative.CloudEvents.Amqp/AmqpExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ namespace CloudNative.CloudEvents.Amqp
/// </summary>
public static class AmqpExtensions
{
internal const string AmqpHeaderPrefix = "cloudEvents:";
// This is internal in CloudEventsSpecVersion.
private const string SpecVersionAttributeName = "specversion";

internal const string SpecVersionAmqpHeader = AmqpHeaderPrefix + "specversion";
internal const string AmqpHeaderUnderscorePrefix = "cloudEvents_";
internal const string AmqpHeaderColonPrefix = "cloudEvents:";

internal const string SpecVersionAmqpHeaderWithUnderscore = AmqpHeaderUnderscorePrefix + SpecVersionAttributeName;
internal const string SpecVersionAmqpHeaderWithColon = AmqpHeaderColonPrefix + SpecVersionAttributeName;

/// <summary>
/// Indicates whether this <see cref="Message"/> holds a single CloudEvent.
Expand All @@ -32,7 +37,8 @@ public static class AmqpExtensions
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent(this Message message) =>
HasCloudEventsContentType(Validation.CheckNotNull(message, nameof(message)), out _) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeaderWithUnderscore) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeaderWithColon);

/// <summary>
/// Converts this AMQP message into a CloudEvent object.
Expand Down Expand Up @@ -69,7 +75,8 @@ public static CloudEvent ToCloudEvent(
else
{
var propertyMap = message.ApplicationProperties.Map;
if (!propertyMap.TryGetValue(SpecVersionAmqpHeader, out var versionId))
if (!propertyMap.TryGetValue(SpecVersionAmqpHeaderWithUnderscore, out var versionId) &&
!propertyMap.TryGetValue(SpecVersionAmqpHeaderWithColon, out versionId))
{
throw new ArgumentException("Request is not a CloudEvent");
}
Expand All @@ -84,11 +91,14 @@ public static CloudEvent ToCloudEvent(

foreach (var property in propertyMap)
{
if (!(property.Key is string key && key.StartsWith(AmqpHeaderPrefix)))
if (!(property.Key is string key &&
(key.StartsWith(AmqpHeaderColonPrefix) || key.StartsWith(AmqpHeaderUnderscorePrefix))))
{
continue;
}
string attributeName = key.Substring(AmqpHeaderPrefix.Length).ToLowerInvariant();
// Note: both prefixes have the same length. If we ever need any prefixes with a different length, we'll need to know which
// prefix we're looking at.
string attributeName = key.Substring(AmqpHeaderUnderscorePrefix.Length).ToLowerInvariant();

// We've already dealt with the spec version.
if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name)
Expand Down Expand Up @@ -142,17 +152,43 @@ private static bool HasCloudEventsContentType(Message message, out string? conte
}

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/>.
/// Converts a CloudEvent to <see cref="Message"/> using the default property prefix. Versions released prior to March 2023
/// use a default property prefix of "cloudEvents:". Versions released from March 2023 onwards use a property prefix of "cloudEvents_".
/// Code wishing to express the prefix explicitly should use <see cref="ToAmqpMessageWithColonPrefix(CloudEvent, ContentMode, CloudEventFormatter)"/> or
/// <see cref="ToAmqpMessageWithUnderscorePrefix(CloudEvent, ContentMode, CloudEventFormatter)"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderColonPrefix);

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/> using a property prefix of "cloudEvents_". This prefix was introduced as the preferred
/// prefix for the AMQP binding in August 2022.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
public static Message ToAmqpMessageWithUnderscorePrefix(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderUnderscorePrefix);

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/> using a property prefix of "cloudEvents:". This prefix
/// is a legacy retained only for compatibility purposes; it can't be used by JMS due to constraints in JMS property names.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessageWithColonPrefix(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderColonPrefix);

private static Message ToAmqpMessage(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string prefix)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(formatter, nameof(formatter));

var applicationProperties = MapHeaders(cloudEvent);
var applicationProperties = MapHeaders(cloudEvent, prefix);
RestrictedDescribed bodySection;
Properties properties;

Expand Down Expand Up @@ -181,11 +217,11 @@ public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode cont
};
}

private static ApplicationProperties MapHeaders(CloudEvent cloudEvent)
private static ApplicationProperties MapHeaders(CloudEvent cloudEvent, string prefix)
{
var applicationProperties = new ApplicationProperties();
var properties = applicationProperties.Map;
properties.Add(SpecVersionAmqpHeader, cloudEvent.SpecVersion.VersionId);
properties.Add(prefix + SpecVersionAttributeName, cloudEvent.SpecVersion.VersionId);

foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
Expand All @@ -197,7 +233,7 @@ private static ApplicationProperties MapHeaders(CloudEvent cloudEvent)
continue;
}

string propKey = AmqpHeaderPrefix + attribute.Name;
string propKey = prefix + attribute.Name;

// TODO: Check that AMQP can handle byte[], bool and int values
object propValue = pair.Value switch
Expand Down
112 changes: 58 additions & 54 deletions test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Amqp;
using Amqp.Framing;
using CloudNative.CloudEvents.NewtonsoftJson;
using Newtonsoft.Json.Linq;
using System;
using System.Net.Mime;
using System.Text;
Expand All @@ -20,71 +19,26 @@ public class AmqpTest
public void AmqpStructuredMessageTest()
{
// The AMQPNetLite library is factored such that we don't need to do a wire test here.
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessage(ContentMode.Structured, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
Assert.True(message1.IsCloudEvent());
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source);
Assert.Equal("123", receivedCloudEvent.Subject);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void AmqpBinaryMessageTest()
{
// The AMQPNetLite library is factored such that we don't need to do a wire test here.
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
Assert.True(message1.IsCloudEvent());
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
AssertCloudEventsEqual(cloudEvent, receivedCloudEvent);
}

[Fact]
Expand All @@ -108,9 +62,7 @@ public void AmqpNormalizesTimestampsToUtc()
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Id = "A234-1234-1234",
// 2018-04-05T18:31:00+01:00 => 2018-04-05T17:31:00Z
Time = new DateTimeOffset(2018, 4, 5, 18, 31, 0, TimeSpan.FromHours(1)),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>"
Time = new DateTimeOffset(2018, 4, 5, 18, 31, 0, TimeSpan.FromHours(1))
};

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Expand All @@ -134,5 +86,57 @@ public void EncodeTextDataInBinaryMode_PopulatesDataProperty()
var text = Encoding.UTF8.GetString(body.Binary);
Assert.Equal("some text", text);
}

[Fact]
public void DefaultPrefix()
{
var cloudEvent = CreateSampleCloudEvent();

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents:id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void UnderscorePrefix()
{
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessageWithUnderscorePrefix(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents_id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void ColonPrefix()
{
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessageWithColonPrefix(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents:id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

private void AssertDecodeThenEqual(CloudEvent cloudEvent, Message message)
{
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());
AssertCloudEventsEqual(cloudEvent, receivedCloudEvent);
}

/// <summary>
/// Returns a CloudEvent with XML data and an extension.
/// </summary>
private static CloudEvent CreateSampleCloudEvent() => new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};
}
}