Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: CloudConnection did not forward close() call to cloud proxy #4546

Merged
merged 5 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix: CouldConnection did not forward close() call to cloud proxy
  • Loading branch information
vipeller committed Mar 4, 2021
commit a31272f68734cca2ba0177acdbfe97466ca65d13
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ public class MqttBridgeEventIds
public const int SubscriptionChangeHandler = EventIdStart + 450;
public const int MessageConfirmingHandler = EventIdStart + 500;
public const int BrokeredCloudProxyDispatcher = EventIdStart + 550;
public const int BrokeredCloudConnection = EventIdStart + 600;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,58 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

public class BrokeredCloudConnection : ICloudConnection
{
IIdentity identity;

public BrokeredCloudConnection(BrokeredCloudProxy cloudProxy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not a ICloudProxy? We should avoid passing in actual types.. if possible.

{
Preconditions.CheckNotNull(cloudProxy);

this.IsActive = true;
this.CloudProxy = Option.Some(cloudProxy as ICloudProxy);
this.identity = cloudProxy.Identity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd... can we just pass in the Identity separately, like we do for CloudConnection?

}

public Option<ICloudProxy> CloudProxy { get; }

public bool IsActive { get; private set; }

public Task<bool> CloseAsync()
public async Task<bool> CloseAsync()
{
// TODO, in order to tear down the connection higher level,
// it should unsubscribe from all topics
this.IsActive = false;
return Task.FromResult(true);
var result = default(bool);

try
{
result = await this.CloudProxy.Match(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: suggest: .Map().GetOrElse(true)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test?

cp => cp.CloseAsync(),
() => Task.FromResult(true));
}
catch (Exception e)
{
result = false;
Events.ErrorClosingCloudProxy(e, this.identity.Id);
}

return result;
}

static class Events
{
const int IdStart = MqttBridgeEventIds.BrokeredCloudConnection;
static readonly ILogger Log = Logger.Factory.CreateLogger<BrokeredCloudConnection>();

enum EventIds
{
ErrorClosingCloudProxy = IdStart,
}

public static void ErrorClosingCloudProxy(Exception e, string id) => Log.LogError((int)EventIds.ErrorClosingCloudProxy, e, $"Error closing cloud-proxy for {id}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
public class BrokeredCloudProxy : ICloudProxy
{
BrokeredCloudProxyDispatcher cloudProxyDispatcher;
IIdentity identity;
Action<string, CloudConnectionStatus> connectionStatusChangedHandler;

AtomicBoolean isActive = new AtomicBoolean(true);
AtomicBoolean twinNeedsSubscribe = new AtomicBoolean(true);

public BrokeredCloudProxy(IIdentity identity, BrokeredCloudProxyDispatcher cloudProxyDispatcher, Action<string, CloudConnectionStatus> connectionStatusChangedHandler)
{
this.identity = Preconditions.CheckNotNull(identity);
this.Identity = Preconditions.CheckNotNull(identity);
vipeller marked this conversation as resolved.
Show resolved Hide resolved
this.cloudProxyDispatcher = Preconditions.CheckNotNull(cloudProxyDispatcher);

this.connectionStatusChangedHandler = connectionStatusChangedHandler;
this.cloudProxyDispatcher.ConnectionStatusChangedEvent += this.ConnectionChangedEventHandler;
}

public bool IsActive => this.isActive;
public IIdentity Identity { get; }

public Task<bool> CloseAsync()
{
Expand All @@ -40,35 +40,35 @@ public Task<bool> CloseAsync()
}

public Task<bool> OpenAsync() => Task.FromResult(true);
public Task RemoveCallMethodAsync() => this.cloudProxyDispatcher.RemoveCallMethodAsync(this.identity);
public Task RemoveDesiredPropertyUpdatesAsync() => this.cloudProxyDispatcher.RemoveDesiredPropertyUpdatesAsync(this.identity);
public Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackStatus) => this.cloudProxyDispatcher.SendFeedbackMessageAsync(this.identity, messageId, feedbackStatus);
public Task SendMessageAsync(IMessage message) => this.cloudProxyDispatcher.SendMessageAsync(this.identity, message);
public Task SendMessageBatchAsync(IEnumerable<IMessage> inputMessages) => this.cloudProxyDispatcher.SendMessageBatchAsync(this.identity, inputMessages);
public Task SetupCallMethodAsync() => this.cloudProxyDispatcher.SetupCallMethodAsync(this.identity);
public Task SetupDesiredPropertyUpdatesAsync() => this.cloudProxyDispatcher.SetupDesiredPropertyUpdatesAsync(this.identity);
public Task StartListening() => this.cloudProxyDispatcher.StartListening(this.identity);
public Task StopListening() => this.cloudProxyDispatcher.StopListening(this.identity);
public Task RemoveCallMethodAsync() => this.cloudProxyDispatcher.RemoveCallMethodAsync(this.Identity);
public Task RemoveDesiredPropertyUpdatesAsync() => this.cloudProxyDispatcher.RemoveDesiredPropertyUpdatesAsync(this.Identity);
public Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackStatus) => this.cloudProxyDispatcher.SendFeedbackMessageAsync(this.Identity, messageId, feedbackStatus);
public Task SendMessageAsync(IMessage message) => this.cloudProxyDispatcher.SendMessageAsync(this.Identity, message);
public Task SendMessageBatchAsync(IEnumerable<IMessage> inputMessages) => this.cloudProxyDispatcher.SendMessageBatchAsync(this.Identity, inputMessages);
public Task SetupCallMethodAsync() => this.cloudProxyDispatcher.SetupCallMethodAsync(this.Identity);
public Task SetupDesiredPropertyUpdatesAsync() => this.cloudProxyDispatcher.SetupDesiredPropertyUpdatesAsync(this.Identity);
public Task StartListening() => this.cloudProxyDispatcher.StartListening(this.Identity);
public Task StopListening() => this.cloudProxyDispatcher.StopListening(this.Identity);

public Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessage)
{
return this.cloudProxyDispatcher.UpdateReportedPropertiesAsync(this.identity, reportedPropertiesMessage, this.twinNeedsSubscribe.GetAndSet(false));
return this.cloudProxyDispatcher.UpdateReportedPropertiesAsync(this.Identity, reportedPropertiesMessage, this.twinNeedsSubscribe.GetAndSet(false));
}

public Task<IMessage> GetTwinAsync()
{
return this.cloudProxyDispatcher.GetTwinAsync(this.identity, this.twinNeedsSubscribe.GetAndSet(false));
return this.cloudProxyDispatcher.GetTwinAsync(this.Identity, this.twinNeedsSubscribe.GetAndSet(false));
}

public Task RemoveTwinResponseAsync()
{
this.twinNeedsSubscribe.Set(true);
return this.cloudProxyDispatcher.RemoveTwinResponseAsync(this.identity);
return this.cloudProxyDispatcher.RemoveTwinResponseAsync(this.Identity);
}

void ConnectionChangedEventHandler(CloudConnectionStatus cloudConnectionStatus)
{
this.connectionStatusChangedHandler(this.identity.Id, cloudConnectionStatus);
this.connectionStatusChangedHandler(this.Identity.Id, cloudConnectionStatus);
}
}
}