Skip to content

Commit

Permalink
Fix: CloudConnection did not forward close() call to cloud proxy (Azu…
Browse files Browse the repository at this point in the history
…re#4546)

The main change is in BrokeredCloudConnection.cs where it did not forward the call to CloudProxy. The rest of the change is only about to being able to log the Identity in case of failure.
The removed "TODO" about the subscription is not needed anymore as the proper place was in ConnectionHander.cs/RemoveConnectionAsync() to remove existing subscriptions.
  • Loading branch information
vipeller authored Mar 5, 2021
1 parent d51e139 commit 6f3f8ec
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ public class MqttBridgeEventIds
public const int SubscriptionChangeHandler = EventIdStart + 450;
public const int MessageConfirmingHandler = EventIdStart + 500;
public const int BrokeredCloudProxyDispatcher = EventIdStart + 550;
public const int BrokeredCloudConnection = EventIdStart + 600;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,57 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

public class BrokeredCloudConnection : ICloudConnection
{
public BrokeredCloudConnection(BrokeredCloudProxy cloudProxy)
readonly IIdentity identity;

public BrokeredCloudConnection(IIdentity identity, ICloudProxy cloudProxy)
{
this.IsActive = true;
this.CloudProxy = Option.Some(cloudProxy as ICloudProxy);
Preconditions.CheckNotNull(identity);
Preconditions.CheckNotNull(cloudProxy);

this.identity = identity;
this.CloudProxy = Option.Some(cloudProxy);
}

public Option<ICloudProxy> CloudProxy { get; }

public bool IsActive { get; private set; }
public bool IsActive => this.CloudProxy.Map(cp => cp.IsActive).GetOrElse(false);

public Task<bool> CloseAsync()
public async Task<bool> CloseAsync()
{
// TODO, in order to tear down the connection higher level,
// it should unsubscribe from all topics
this.IsActive = false;
return Task.FromResult(true);
var result = default(bool);

try
{
result = await this.CloudProxy
.Map(cp => cp.CloseAsync())
.GetOrElse(Task.FromResult(true));
}
catch (Exception e)
{
result = false;
Events.ErrorClosingCloudProxy(e, this.identity.Id);
}

return result;
}

static class Events
{
const int IdStart = MqttBridgeEventIds.BrokeredCloudConnection;
static readonly ILogger Log = Logger.Factory.CreateLogger<BrokeredCloudConnection>();

enum EventIds
{
ErrorClosingCloudProxy = IdStart,
}

public static void ErrorClosingCloudProxy(Exception e, string id) => Log.LogError((int)EventIds.ErrorClosingCloudProxy, e, $"Error closing cloud-proxy for {id}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task<Try<ICloudConnection>> Connect(IIdentity identity, Action<stri
}

var cloudProxy = new BrokeredCloudProxy(identity, this.cloudProxyDispatcher, connectionStatusChangedHandler);
return new Try<ICloudConnection>(new BrokeredCloudConnection(cloudProxy));
return new Try<ICloudConnection>(new BrokeredCloudConnection(identity, cloudProxy));
}

// The purpose of this method is to make less noise in logs when the broker
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test
{
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
using Moq;
using Xunit;

public class BrokeredCloudConnectionTest
{
const string IotHubHostName = "somehub.azure-devices.net";
const string ConnectivityTopic = "$internal/connectivity";

[Fact]
public async Task AddRemoveDeviceConnection()
{
var deviceId = "some_device";
var deviceCredentials = new TokenCredentials(new DeviceIdentity(IotHubHostName, deviceId), "some_token", "some_product", Option.None<string>(), Option.None<string>(), false);

var (connectionManager, cloudProxyDispatcher) = SetupEnvironment();

await SignalConnected(cloudProxyDispatcher);

Try<ICloudProxy> cloudProxyTry = await connectionManager.CreateCloudConnectionAsync(deviceCredentials);
Assert.True(cloudProxyTry.Success);
Assert.True(cloudProxyTry.Value.IsActive);

await connectionManager.RemoveDeviceConnection(deviceId);
Assert.False(cloudProxyTry.Value.IsActive);
}

[Fact]
public async Task DisconnectRemovesConnections()
{
var deviceId = "some_device";
var deviceCredentials = new TokenCredentials(new DeviceIdentity(IotHubHostName, deviceId), "some_token", "some_product", Option.None<string>(), Option.None<string>(), false);

var (connectionManager, cloudProxyDispatcher) = SetupEnvironment();

await SignalConnected(cloudProxyDispatcher);

Try<ICloudProxy> cloudProxyTry = await connectionManager.CreateCloudConnectionAsync(deviceCredentials);
Assert.True(cloudProxyTry.Success);
Assert.True(cloudProxyTry.Value.IsActive);

await SignalDisconnected(cloudProxyDispatcher);

// After the signal triggered, it is async disconnecting cloud proxies, so give some time
await WaitUntil(() => !cloudProxyTry.Value.IsActive, TimeSpan.FromSeconds(5));

Assert.False(cloudProxyTry.Value.IsActive);
}

[Fact]
public async Task ConnectionEventHandlersMaintainedProperly()
{
// This test is motivated by an actual bug when new cloud proxies were added to
// connection (C#) events but then they never got removed, causing memory leak and
// triggering event handlers multiple times, as the old proxies kept hanging on
// the event.

var deviceId = "some_device";
var deviceCredentials = new TokenCredentials(new DeviceIdentity(IotHubHostName, deviceId), "some_token", "some_product", Option.None<string>(), Option.None<string>(), false);

var (connectionManager, cloudProxyDispatcher) = SetupEnvironment();

await SignalConnected(cloudProxyDispatcher);

Try<ICloudProxy> cloudProxyTry = await connectionManager.CreateCloudConnectionAsync(deviceCredentials);
Assert.True(cloudProxyTry.Success);
Assert.True(cloudProxyTry.Value.IsActive);

// Disconnect/Connect a few times, this generates new and new proxies.
var lastProxy = cloudProxyTry.Value.AsPrivateAccessible().InnerCloudProxy as BrokeredCloudProxy;
for (var i = 0; i < 3; i++)
{
await SignalDisconnected(cloudProxyDispatcher);

await WaitUntil(() => !lastProxy.IsActive, TimeSpan.FromSeconds(5));

// As the proxy went inactive, the subscription list must be empty
var subscribers = cloudProxyDispatcher.AsPrivateAccessible().ConnectionStatusChangedEvent as MulticastDelegate;
Assert.NotNull(subscribers);

var subscriberList = subscribers.GetInvocationList().Select(l => l.Target).OfType<BrokeredCloudProxy>().ToArray();
Assert.Empty(subscriberList);

// Bring up a new proxy
await SignalConnected(cloudProxyDispatcher);

var currentProxyTry = (await connectionManager.GetCloudConnection(deviceId)).GetOrElse((ICloudProxy)null);
Assert.NotNull(currentProxyTry);
Assert.True(currentProxyTry.IsActive);

var currentProxy = currentProxyTry.AsPrivateAccessible().InnerCloudProxy as BrokeredCloudProxy;

// Just to be sure that this is a new proxy (and not the old is re-activated)
Assert.NotEqual(lastProxy, currentProxy);

// The new proxy must have subscribed and it should be the only subscriber
subscribers = cloudProxyDispatcher.AsPrivateAccessible().ConnectionStatusChangedEvent as MulticastDelegate;
Assert.NotNull(subscribers);

subscriberList = subscribers.GetInvocationList().Select(l => l.Target).OfType<BrokeredCloudProxy>().ToArray();
Assert.Single(subscriberList);
Assert.Equal(currentProxy, subscriberList.First());

lastProxy = currentProxy;
}
}

Task SignalConnected(BrokeredCloudProxyDispatcher brokeredCloudProxyDispatcher) => SignalConnectionEvent(brokeredCloudProxyDispatcher, "Connected");
Task SignalDisconnected(BrokeredCloudProxyDispatcher brokeredCloudProxyDispatcher) => SignalConnectionEvent(brokeredCloudProxyDispatcher, "Disconnected");

async Task SignalConnectionEvent(BrokeredCloudProxyDispatcher brokeredCloudProxyDispatcher, string status)
{
var packet = new MqttPublishInfo(ConnectivityTopic, Encoding.UTF8.GetBytes("{\"status\":\"" + status + "\"}"));
await brokeredCloudProxyDispatcher.HandleAsync(packet);
}

(ConnectionManager, BrokeredCloudProxyDispatcher) SetupEnvironment()
{
var cloudProxyDispatcher = new BrokeredCloudProxyDispatcher();
cloudProxyDispatcher.SetConnector(Mock.Of<IMqttBrokerConnector>());

var cloudConnectionProvider = new BrokeredCloudConnectionProvider(cloudProxyDispatcher);
cloudConnectionProvider.BindEdgeHub(Mock.Of<IEdgeHub>());

var deviceConnectivityManager = new BrokeredDeviceConnectivityManager(cloudProxyDispatcher);

var connectionManager = new ConnectionManager(
cloudConnectionProvider,
Mock.Of<ICredentialsCache>(),
new IdentityProvider(IotHubHostName),
deviceConnectivityManager);

return (connectionManager, cloudProxyDispatcher);
}

async Task WaitUntil(Func<bool> condition, TimeSpan timeout)
{
var startTime = DateTime.Now;

while (!condition() && DateTime.Now - startTime < timeout)
{
await Task.Delay(100);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Dynamic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.CloudProxy;
Expand Down Expand Up @@ -504,87 +502,6 @@ public class NullAuthenticator : IAuthenticator
}
}

internal class PrivateAccessor : DynamicObject
{
private readonly object target;

internal PrivateAccessor(object target)
{
this.target = target;
}

public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result)
{
var type = this.target.GetType();
var methodInfo = type.GetMethod(binder.Name, BindingFlags.NonPublic | BindingFlags.Instance) ??
type.GetMethod(binder.Name); // sometimes we call public method on non-public class

if (methodInfo != null)
{
result = methodInfo.Invoke(this.target, args);
return true;
}
else
{
result = null;
return false;
}
}

public override bool TrySetMember(SetMemberBinder binder, object value)
{
var type = this.target.GetType();
var propertyInfo = type.GetProperty(binder.Name, BindingFlags.NonPublic | BindingFlags.Instance) ??
type.GetProperty(binder.Name); // when getter is public, this returns private setter

if (propertyInfo != null)
{
propertyInfo.SetValue(this.target, value);
return true;
}
else
{
return false;
}
}

public override bool TryGetMember(GetMemberBinder binder, out object result)
{
var type = this.target.GetType();
var fieldInfo = type.GetField(binder.Name, BindingFlags.NonPublic | BindingFlags.Instance);

if (fieldInfo != null)
{
result = fieldInfo.GetValue(this.target);
return true;
}
else
{
// try with properties
var propertyInfo = type.GetProperty(binder.Name, BindingFlags.NonPublic | BindingFlags.Instance) ??
type.GetProperty(binder.Name);

if (propertyInfo != null)
{
result = propertyInfo.GetValue(this.target);
return true;
}

result = null;
return false;
}
}
}

internal static class DynamicPrivateAccessor
{
/// <summary>
/// This is for test scenarios only! The purpose of this method is to make it possible to set up
/// or read an object state for testing purposes. Never use this for production code!
/// </summary>
internal static dynamic AsPrivateAccessible(this object target) => new PrivateAccessor(target);
}

static class ListShuffleExtensions
{
public static void Shuffle<T>(this IList<T> list, Random rnd)
Expand Down
Loading

0 comments on commit 6f3f8ec

Please sign in to comment.