Skip to content

Commit

Permalink
Added test to see if edgeHub keeps track on connected devices and the…
Browse files Browse the repository at this point in the history
…ir subscriptions (Azure#4381)
  • Loading branch information
vipeller authored Feb 23, 2021
1 parent a027808 commit 2418769
Show file tree
Hide file tree
Showing 4 changed files with 618 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class DirectMethodHandler : IDirectMethodHandler, IMessageConsumer, IMess
const string MethodPostIndirectDevice = "$iothub/+/methods/res/#";

const string MethodSubscriptionForPostPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/methods/post/\#$";
const string MethodResponsePattern = @"^((\$edgehub)|(\$iothub))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/methods/res/(?<res>\d+)/\?\$rid=(?<rid>.+)";
const string MethodResponsePattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/methods/res/(?<res>\d+)/\?\$rid=(?<rid>.+)";

const string MethodCallToDeviceTopicTemplate = "{0}/{1}/methods/post/{2}/?$rid={3}";
const string MethodCallToModuleTopicTemplate = "{0}/{1}/{2}/methods/post/{3}/?$rid={4}";
Expand Down Expand Up @@ -85,11 +85,13 @@ async Task<bool> HandleMethodResponse(Match match, MqttPublishInfo publishInfo)
var rid = match.Groups["rid"];
var res = match.Groups["res"];

var isDirect = string.Equals(match.Groups["dialect"].Value, MqttBrokerAdapterConstants.DirectTopicPrefix);

var identity = id2.Success
? this.identityProvider.Create(id1.Value, id2.Value)
: this.identityProvider.Create(id1.Value);

var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity);
var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity, isDirect);
var listener = default(IDeviceListener);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class TelemetryHandler : IMessageConsumer
const string TelemetryIndirectDevice = "$iothub/+/messages/events/#";
const string TelemetryIndirectModule = "$iothub/+/+/messages/events/#";

const string TelemetryPublishPattern = @"^((\$edgehub)|(\$iothub))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/messages/events(/(?<bag>.*))?";
const string TelemetryPublishPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/messages/events(/(?<bag>.*))?";

static readonly string[] subscriptions = new[] { TelemetryDirectDevice, TelemetryDirectModule, TelemetryIndirectDevice, TelemetryIndirectModule };

Expand Down Expand Up @@ -53,11 +53,13 @@ async Task<bool> HandleTelemetry(Match match, MqttPublishInfo publishInfo)
var id2 = match.Groups["id2"];
var bag = match.Groups["bag"];

var isDirect = string.Equals(match.Groups["dialect"].Value, MqttBrokerAdapterConstants.DirectTopicPrefix);

var identity = id2.Success
? this.identityProvider.Create(id1.Value, id2.Value)
: this.identityProvider.Create(id1.Value);

var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity);
var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity, isDirect);
var listener = default(IDeviceListener);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class TwinHandler : ITwinHandler, IMessageConsumer, IMessageProducer
const string TwinUpdateIndirectDevice = "$iothub/+/twin/reported/#";
const string TwinUpdateIndirectModule = "$iothub/+/+/twin/reported/#";

const string TwinGetPublishPattern = @"^((\$edgehub)|(\$iothub))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/get/\?\$rid=(?<rid>.+)";
const string TwinUpdatePublishPattern = @"^((\$edgehub)|(\$iothub))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/reported/\?\$rid=(?<rid>.+)";
const string TwinGetPublishPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/get/\?\$rid=(?<rid>.+)";
const string TwinUpdatePublishPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/reported/\?\$rid=(?<rid>.+)";

const string TwinSubscriptionForResultsPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/res/\#$";
const string TwinSubscriptionForPatchPattern = @"^((?<dialect>(\$edgehub)|(\$iothub)))/(?<id1>[^/\+\#]+)(/(?<id2>[^/\+\#]+))?/twin/desired/\#$";
Expand Down Expand Up @@ -183,11 +183,13 @@ async Task<bool> HandleUpstreamRequest(Func<IDeviceListener, string, Task> actio
var id2 = match.Groups["id2"];
var rid = match.Groups["rid"];

var isDirect = string.Equals(match.Groups["dialect"].Value, MqttBrokerAdapterConstants.DirectTopicPrefix);

var identity = id2.Success
? this.identityProvider.Create(id1.Value, id2.Value)
: this.identityProvider.Create(id1.Value);

var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity);
var maybeListener = await this.connectionRegistry.GetOrCreateDeviceListenerAsync(identity, isDirect);
var listener = default(IDeviceListener);

try
Expand Down
Loading

0 comments on commit 2418769

Please sign in to comment.