Skip to content

Commit

Permalink
Send connection device Id information on twin change notifications #4762
Browse files Browse the repository at this point in the history
 (#4888)

Cherry-pick: fa60e52

Issue: EdgeHub supports routing twin change notifications, which basically includes routing RP updates to other modules. The message that gets routed should have information about the device that sent the RP update.
But because of a recent change in EH, this information got dropped.
#4689

Fix: Fix is to route the information about the device/module that sent the RP update to the receiving module.
  • Loading branch information
varunpuranik authored May 1, 2021
1 parent 09d243e commit cd39064
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,20 @@ public AmqpMessage FromMessage(IMessage message)
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsInputNameKey] = inputName;
}

if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionDeviceId, out string connectionDeviceId))
if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.RpConnectionDeviceIdInternal, out string rpConnectionDeviceId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId] = rpConnectionDeviceId;
}
else if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionDeviceId, out string connectionDeviceId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId] = connectionDeviceId;
}

if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionModuleId, out string connectionModuleId))
if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.RpConnectionModuleIdInternal, out string rpConnectionModuleId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId] = rpConnectionModuleId;
}
else if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionModuleId, out string connectionModuleId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId] = connectionModuleId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public static class SystemProperties
public const string InterfaceId = "iothub-interface-id";
public const string ModelId = "modelId";

public const string RpConnectionDeviceIdInternal = "rpSenderDeviceId";
public const string RpConnectionModuleIdInternal = "rpSenderModuleId";

public static readonly Dictionary<string, string> IncomingSystemPropertiesMap = new Dictionary<string, string>
{
{ OnTheWireSystemPropertyNames.ExpiryTimeUtcOnTheWireName, ExpiryTimeUtc },
Expand Down Expand Up @@ -67,7 +70,9 @@ public static class SystemProperties
{ Operation, OnTheWireSystemPropertyNames.OperationOnTheWireName },
{ CreationTime, OnTheWireSystemPropertyNames.CreationTimeOnTheWireName },
{ ConnectionDeviceId, OnTheWireSystemPropertyNames.ConnectionDeviceIdOnTheWireName },
{ ConnectionModuleId, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName }
{ ConnectionModuleId, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName },
{ RpConnectionDeviceIdInternal, OnTheWireSystemPropertyNames.ConnectionDeviceIdOnTheWireName },
{ RpConnectionModuleIdInternal, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName }
};

static class OnTheWireSystemPropertyNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,17 @@ public async Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessa
switch (this.Identity)
{
case IModuleIdentity moduleIdentity:
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = moduleIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = moduleIdentity.ModuleId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] = moduleIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionModuleIdInternal] = moduleIdentity.ModuleId;
break;
case IDeviceIdentity deviceIdentity:
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = deviceIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] = deviceIdentity.DeviceId;
break;
}

reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = this.edgeHub.GetEdgeDeviceId();
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = Constants.EdgeHubModuleId;

try
{
using (Metrics.TimeReportedPropertiesUpdate(this.Identity.Id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ public Task UpdateReportedPropertiesAsync(IIdentity identity, IMessage reportedP
Events.UpdateReportedPropertiesReceived(identity);
Task cloudSendMessageTask = this.twinManager.UpdateReportedPropertiesAsync(identity.Id, reportedPropertiesMessage);

reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = this.edgeDeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = this.edgeModuleId;

IRoutingMessage routingMessage = this.ProcessMessageInternal(reportedPropertiesMessage, false);
Task routingSendMessageTask = this.router.RouteAsync(routingMessage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public IProtocolGatewayMessage FromMessage(IMessage message)
}
}

if (message.SystemProperties.TryGetValue(SystemProperties.RpConnectionDeviceIdInternal, out string rpDeviceId))
{
properties[SystemProperties.OutgoingSystemPropertiesMap[SystemProperties.RpConnectionDeviceIdInternal]] = rpDeviceId;
}

if (message.SystemProperties.TryGetValue(SystemProperties.RpConnectionModuleIdInternal, out string rpModuleId))
{
properties[SystemProperties.OutgoingSystemPropertiesMap[SystemProperties.RpConnectionModuleIdInternal]] = rpModuleId;
}

if (!this.addressConvertor.TryBuildProtocolAddressFromEdgeHubMessage(uriTemplateKey, message, properties, out string address))
{
throw new InvalidOperationException("Could not derive destination address using message system properties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,5 +343,31 @@ byte[] GetMessageBody(AmqpMessage sourceMessage)
Assert.Equal("Value2", amqpMessage.ApplicationProperties.Map["Prop2"].ToString());
}
}

[Fact]
public void TestConnectionDeviceIdTest()
{
// Setup
var systemProperties = new Dictionary<string, string>
{
[SystemProperties.ConnectionDeviceId] = "edgeDeviceId",
[SystemProperties.ConnectionModuleId] = "$edgeHub",
[SystemProperties.RpConnectionDeviceIdInternal] = "leafDevice1",
[SystemProperties.RpConnectionModuleIdInternal] = "leafModule1",
};

byte[] bytes = { 1, 2, 3, 4 };

var message = new EdgeMessage(bytes, new Dictionary<string, string>(), systemProperties);
var messageConverter = new AmqpMessageConverter();

// Act
using (AmqpMessage amqpMessage = messageConverter.FromMessage(message))
{
// Assert
Assert.Equal("leafDevice1", amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId]);
Assert.Equal("leafModule1", amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public async Task ForwardsTwinPatchOperationToTheCloudProxy()
edgeHub.Setup(e => e.UpdateReportedPropertiesAsync(It.IsAny<IIdentity>(), It.IsAny<IMessage>()))
.Callback<IIdentity, IMessage>((id, m) => receivedMessage = m)
.Returns(Task.CompletedTask);
edgeHub.Setup(e => e.GetEdgeDeviceId()).Returns("edgeDeviceId1");
Mock.Get(connMgr).Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy)));
var listener = new DeviceMessageHandler(identity, edgeHub.Object, connMgr, DefaultMessageAckTimeout, Option.None<string>());
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
Expand All @@ -126,8 +127,10 @@ public async Task ForwardsTwinPatchOperationToTheCloudProxy()
Assert.NotNull(receivedMessage);
Assert.Equal(Constants.TwinChangeNotificationMessageSchema, receivedMessage.SystemProperties[SystemProperties.MessageSchema]);
Assert.Equal(Constants.TwinChangeNotificationMessageType, receivedMessage.SystemProperties[SystemProperties.MessageType]);
Assert.Equal("device1", receivedMessage.SystemProperties[SystemProperties.ConnectionDeviceId]);
Assert.Equal("module1", receivedMessage.SystemProperties[SystemProperties.ConnectionModuleId]);
Assert.Equal("edgeDeviceId1", receivedMessage.SystemProperties[SystemProperties.ConnectionDeviceId]);
Assert.Equal("$edgeHub", receivedMessage.SystemProperties[SystemProperties.ConnectionModuleId]);
Assert.Equal("device1", receivedMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal]);
Assert.Equal("module1", receivedMessage.SystemProperties[SystemProperties.RpConnectionModuleIdInternal]);
Assert.True(receivedMessage.SystemProperties.ContainsKey(SystemProperties.EnqueuedTime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class RoutingTest
{
static readonly TimeSpan DefaultMessageAckTimeout = TimeSpan.FromSeconds(30);
static readonly Random Rand = new Random();
static readonly string edgeHubModuleId = "$edgeHub";

[Fact]
public async Task RouteToCloudTest()
Expand Down Expand Up @@ -379,10 +378,10 @@ public async Task ReportedPropertyUpdatesAsTelemetryTest()
await device1.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());

Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
}

[Fact(Skip = "Flaky test, bug #2494150")]
[Fact]
public async Task TestRoutingTwinChangeNotificationFromDevice()
{
var routes = new List<string>
Expand All @@ -400,11 +399,11 @@ public async Task TestRoutingTwinChangeNotificationFromDevice()
IMessage message = GetReportedPropertiesMessage();
await device1.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification("device1", null));
}

[Fact(Skip = "Flaky test, bug #2494150")]
[Fact]
public async Task TestRoutingTwinChangeNotificationFromModule()
{
var routes = new List<string>
Expand All @@ -422,8 +421,8 @@ public async Task TestRoutingTwinChangeNotificationFromModule()
IMessage message = GetReportedPropertiesMessage();
await module2.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification(edgeDeviceId, "mod2"));
}

// Need longer sleep when run tests in parallel
Expand Down Expand Up @@ -467,7 +466,7 @@ public async Task TestRoutingTwinChangeNotificationFromModule()
ITwinManager twinManager = new TwinManager(connectionManager, new TwinCollectionMessageConverter(), new TwinMessageConverter(), Option.None<IEntityStore<string, TwinInfo>>());
var invokeMethodHandler = Mock.Of<IInvokeMethodHandler>();
var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager);
IEdgeHub edgeHub = new RoutingEdgeHub(router, routingMessageConverter, connectionManager, twinManager, edgeDeviceId, edgeHubModuleId, invokeMethodHandler, subscriptionProcessor, Mock.Of<IDeviceScopeIdentitiesCache>());
IEdgeHub edgeHub = new RoutingEdgeHub(router, routingMessageConverter, connectionManager, twinManager, edgeDeviceId, Constants.EdgeHubModuleId, invokeMethodHandler, subscriptionProcessor, Mock.Of<IDeviceScopeIdentitiesCache>());
return (edgeHub, connectionManager);
}

Expand Down Expand Up @@ -638,9 +637,11 @@ public bool HasReceivedMessage(IMessage message) => this.receivedMessages.Any(
public Task UpdateReportedProperties(IMessage reportedPropertiesMessage) =>
this.deviceListener.UpdateReportedPropertiesAsync(reportedPropertiesMessage, Guid.NewGuid().ToString());

public bool HasReceivedTwinChangeNotification() => this.receivedMessages.Any(
public bool HasReceivedTwinChangeNotification(string connDeviceId, string connModuleId) => this.receivedMessages.Any(
m =>
m.SystemProperties[SystemProperties.MessageType] == Constants.TwinChangeNotificationMessageType);
m.SystemProperties[SystemProperties.MessageType] == Constants.TwinChangeNotificationMessageType
&& m.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] == connDeviceId
&& (string.IsNullOrWhiteSpace(connModuleId) || m.SystemProperties[SystemProperties.RpConnectionModuleIdInternal] == connModuleId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,62 @@ public void TestToMessage_NoTopicMatch()
var protocolGatewayMessageConverter = new ProtocolGatewayMessageConverter(converter, ByteBufferConverter);
Assert.Throws<InvalidOperationException>(() => protocolGatewayMessageConverter.ToMessage(protocolGatewayMessage));
}

[Fact]
public void RpSenderTest()
{
// Setup
const string DeviceId = "Device1";
const string ModuleId = "Module1";
const string Input = "input1";
var outputTemplates = new Dictionary<string, string>
{
["ModuleEndpoint"] = "devices/{deviceId}/modules/{moduleId}/inputs/{inputName}"
};
var inputTemplates = new List<string>
{
"devices/{deviceId}/messages/events/{params}/"
};
var config = new MessageAddressConversionConfiguration(
inputTemplates,
outputTemplates);
var converter = new MessageAddressConverter(config);

var properties = new Dictionary<string, string>();

var systemProperties = new Dictionary<string, string>
{
[SystemProperties.OutboundUri] = Constants.OutboundUriModuleEndpoint,
[SystemProperties.LockToken] = Guid.NewGuid().ToString(),
[TemplateParameters.DeviceIdTemplateParam] = DeviceId,
[Constants.ModuleIdTemplateParameter] = ModuleId,
[SystemProperties.InputName] = Input,
[SystemProperties.OutputName] = "output",
[SystemProperties.ContentEncoding] = "utf-8",
[SystemProperties.ContentType] = "application/json",

[SystemProperties.ConnectionDeviceId] = "edgeDevice1",
[SystemProperties.ConnectionModuleId] = "$edgeHub",
[SystemProperties.RpConnectionDeviceIdInternal] = "leafDevice1",
[SystemProperties.RpConnectionModuleIdInternal] = "leafModule1",
};

var message = Mock.Of<IMessage>(
m =>
m.Body == new byte[] { 1, 2, 3 } &&
m.Properties == properties &&
m.SystemProperties == systemProperties);

var protocolGatewayMessageConverter = new ProtocolGatewayMessageConverter(converter, ByteBufferConverter);

// Act
IProtocolGatewayMessage pgMessage = protocolGatewayMessageConverter.FromMessage(message);

// Verify
Assert.NotNull(pgMessage);
Assert.Equal(@"devices/Device1/modules/Module1/inputs/input1/%24.ce=utf-8&%24.ct=application%2Fjson&%24.cdid=leafDevice1&%24.cmid=leafModule1", pgMessage.Address);
Assert.Equal("leafDevice1", pgMessage.Properties["$.cdid"]);
Assert.Equal("leafModule1", pgMessage.Properties["$.cmid"]);
}
}
}

0 comments on commit cd39064

Please sign in to comment.