Skip to content

Commit

Permalink
Fix: detect fail-over from iot hub/sdk behavior and disconnect from h…
Browse files Browse the repository at this point in the history
…ub (#5718)

Iot-hub does not disconnect in case of fail over and edgeHub keeps sending messages/operations. These operations fail with a specific error and never recover.
Added some logic to filter exceptions, so when the error message arrives that is thrown by the "old" server (the server serving before fail-over), EdgeHub disconnect and the new connection attempt will find the new server.

cherry pick of #5669
  • Loading branch information
vipeller authored Oct 21, 2021
1 parent 2391cd9 commit 52c563a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ Task HandleException(Exception ex)
Events.HandleNre(ex, this);
return this.CloseAsync();
}
else if (ex.IsFailOver())
{
Events.FailOverDetected(ex, this);
return this.CloseAsync();
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -599,7 +604,8 @@ enum EventIds
ErrorUpdatingReportedProperties,
ErrorSendingFeedbackMessageAsync,
ErrorGettingTwin,
HandleNre
HandleNre,
FailOverDetected
}

public static void Closed(CloudProxy cloudProxy)
Expand Down Expand Up @@ -712,6 +718,11 @@ public static void HandleNre(Exception ex, CloudProxy cloudProxy)
Log.LogDebug((int)EventIds.HandleNre, ex, Invariant($"Got a non-recoverable error from client for {cloudProxy.clientId}. Closing the cloud proxy since it may be in a bad state."));
}

public static void FailOverDetected(Exception ex, CloudProxy cloudProxy)
{
Log.LogInformation((int)EventIds.FailOverDetected, ex, Invariant($"Fail-over detected, closing cloud proxy for {cloudProxy.clientId}."));
}

internal static void ExceptionInHandleException(CloudProxy cloudProxy, Exception handlingException, Exception caughtException)
{
Log.LogDebug((int)EventIds.ExceptionInHandleException, Invariant($"Cloud proxy {cloudProxy.id} got exception {caughtException} while handling exception {handlingException}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ async Task<T> InvokeFunc<T>(Func<Task<T>> func, string operation, bool useForCon
await this.deviceConnectivityManager.CallTimedOut();
}
}
else if (ex.IsFailOver())
{
Events.FailOverDetected(this.identity, operation, mappedException);
if (useForConnectivityCheck)
{
await this.deviceConnectivityManager.CallTimedOut();
}
}
else
{
Events.OperationFailed(this.identity, operation, mappedException);
Expand Down Expand Up @@ -199,7 +207,8 @@ enum EventIds
OperationTimedOut,
OperationFailed,
OperationSucceeded,
ChangingStatus
ChangingStatus,
FailOverDetected
}

public static void ReceivedDeviceSdkCallback(IIdentity identity, ConnectionStatus status, ConnectionStatusChangeReason reason)
Expand All @@ -226,6 +235,11 @@ public static void ChangingStatus(AtomicBoolean isConnected, IIdentity identity)
{
Log.LogInformation((int)EventIds.ChangingStatus, $"Cloud connection for {identity.Id} is {isConnected.Get()}");
}

public static void FailOverDetected(IIdentity identity, string operation, Exception ex)
{
Log.LogInformation((int)EventIds.FailOverDetected, ex, $"Operation {operation} failed for {identity.Id} because of fail-over");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
{
using System;
using DotNetty.Transport.Channels;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Util;

public static class ExceptionMapper
{
const string FailOverMessage = "(condition='com.microsoft:iot-hub-not-found-error')";

public static Exception GetEdgeException(this Exception sdkException, string operation)
{
Preconditions.CheckNonWhiteSpace(operation, nameof(operation));
Expand All @@ -20,5 +23,15 @@ public static Exception GetEdgeException(this Exception sdkException, string ope

return sdkException;
}

public static bool IsFailOver(this Exception ex)
{
var isFailOver = ex is IotHubException
&& ex.InnerException != null
&& !string.IsNullOrEmpty(ex.InnerException.Message)
&& ex.InnerException.Message.Contains(FailOverMessage);

return isFailOver;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy.Test
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand Down Expand Up @@ -246,6 +247,32 @@ public async Task TestHandleNonRecoverableExceptions(Type exceptionType)
client.VerifyAll();
}

[Fact]
public async Task TestHandleFailOverExceptions()
{
// Arrange
var symbol = new Amqp.Encoding.AmqpSymbol("com.microsoft:iot-hub-not-found-error");
var failOverException = new IotHubException(new Amqp.AmqpException(symbol, $"(condition='{symbol}')"));

var messageConverter = Mock.Of<IMessageConverter<Message>>(m => m.FromMessage(It.IsAny<IMessage>()) == new Message());
var messageConverterProvider = Mock.Of<IMessageConverterProvider>(m => m.Get<Message>() == messageConverter);
string clientId = "d1";
var cloudListener = Mock.Of<ICloudListener>();
TimeSpan idleTimeout = TimeSpan.FromSeconds(60);
Action<string, CloudConnectionStatus> connectionStatusChangedHandler = (s, status) => { };
var client = new Mock<IClient>(MockBehavior.Strict);
client.Setup(c => c.SendEventAsync(It.IsAny<Message>())).ThrowsAsync(failOverException);
client.Setup(c => c.CloseAsync()).Returns(Task.CompletedTask);
var cloudProxy = new CloudProxy(client.Object, messageConverterProvider, clientId, connectionStatusChangedHandler, cloudListener, idleTimeout, false);
IMessage message = new EdgeMessage.Builder(new byte[0]).Build();

// Act
await Assert.ThrowsAsync<IotHubException>(() => cloudProxy.SendMessageAsync(message));

// Assert.
client.VerifyAll();
}

static async Task CheckMessageInEventHub(Dictionary<string, IList<IMessage>> sentMessagesByDevice, DateTime startTime)
{
string eventHubConnectionString = await SecretsHelper.GetSecretFromConfigKey("eventHubConnStrKey");
Expand Down

0 comments on commit 52c563a

Please sign in to comment.