From 6659f1277c45583b1cb3c03c7d9aa28e7808a6f4 Mon Sep 17 00:00:00 2001 From: vipeller <51135538+vipeller@users.noreply.github.com> Date: Wed, 17 Mar 2021 10:30:44 -0700 Subject: [PATCH] Fix: throw transient error when edgeHubCore is disconnected from the broker (#4603) so messages will be resent instead of dropping them --- .../BrokeredCloudConnectionProvider.cs | 3 +- .../ConnectionEventTest.cs | 40 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/upstream/BrokeredCloudConnectionProvider.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/upstream/BrokeredCloudConnectionProvider.cs index 11d6dea3452..4e45a67082c 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/upstream/BrokeredCloudConnectionProvider.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/upstream/BrokeredCloudConnectionProvider.cs @@ -3,6 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter { using System; using System.Threading.Tasks; + using Microsoft.Azure.Devices.Client.Exceptions; using Microsoft.Azure.Devices.Edge.Hub.Core; using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud; using Microsoft.Azure.Devices.Edge.Hub.Core.Identity; @@ -32,7 +33,7 @@ public async Task> Connect(IIdentity identity, Action(new Exception("Bridge is not connected upstream")); + return new Try(new IotHubException("Bridge is not connected upstream")); } var cloudProxy = new BrokeredCloudProxy(identity, this.cloudProxyDispatcher, connectionStatusChangedHandler); diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test/ConnectionEventTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test/ConnectionEventTest.cs index b1f0a797ab7..1453da74b7d 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test/ConnectionEventTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test/ConnectionEventTest.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test using System.Linq; using System.Text; using System.Text.RegularExpressions; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Devices.Edge.Hub.CloudProxy; using Microsoft.Azure.Devices.Edge.Hub.Core; @@ -121,6 +122,43 @@ void Spy(RpcPacket packet) } } + [Fact] + public async Task WhenNoConnectionMessagesDontGetDropped() + { + // The motivation of this test was a bug, when throwing a bad exception type caused edgeHub to drop messages + // while edgeHubCore was disconnected from the MQTT broker. This test ensures that the message drop was + // due to the wrong exception type, and now that it is fixed, no message drop occures. + var (subscriptionChangeHandler, cloudProxyDispatcher, brokerConnector) = await SetupEnvironment(); + var milestone = new SemaphoreSlim(0, 1); + var shouldReceiveNow = false; + var deviceId = "device_1"; + var messageContent = "test message"; + + var edgeHub = cloudProxyDispatcher.AsPrivateAccessible().edgeHub as IEdgeHub; + brokerConnector.SetPacketSpy(Spy); + + var identity = new DeviceIdentity(iotHubName, deviceId); + await edgeHub.ProcessDeviceMessage(identity, new EdgeMessage(Encoding.UTF8.GetBytes(messageContent), new Dictionary(), new Dictionary() { [Core.SystemProperties.ConnectionDeviceId] = deviceId } )); + + // the bridge-connector keeps trying for 5 seconds, so let's wait a safe 10 seconds to be sure that the first attempt to send a message fails + await Task.Delay(TimeSpan.FromSeconds(10)); + + shouldReceiveNow = true; + await cloudProxyDispatcher.HandleAsync(new MqttPublishInfo("$internal/connectivity", Encoding.UTF8.GetBytes("{\"status\":\"Connected\"}"))); + + // the retry-config is to retry every 5 sec, so 10 should be enough + Assert.True(await milestone.WaitAsync(TimeSpan.FromSeconds(10))); + + void Spy(RpcPacket packet) + { + Assert.True(shouldReceiveNow); + Assert.Equal("pub", packet.Cmd); + Assert.Equal(messageContent, Encoding.UTF8.GetString(packet.Payload)); + + milestone.Release(); + } + } + async Task SetupSpyAndGenerateEvents(Action spy, IMessageConsumer cloudProxyDispatcher, NullBrokerConnector brokerConnector) { await cloudProxyDispatcher.HandleAsync(new MqttPublishInfo("$internal/connectivity", Encoding.UTF8.GetBytes("{\"status\":\"Connected\"}"))); @@ -198,7 +236,7 @@ async Task> ConnectClientsAsync(IMessageConsumer subscription Routing.PerfCounter = NullRoutingPerfCounter.Instance; Routing.UserAnalyticsLogger = NullUserAnalyticsLogger.Instance; - var defaultRetryStrategy = new FixedInterval(0, TimeSpan.FromSeconds(1)); + var defaultRetryStrategy = new FixedInterval(5, TimeSpan.FromSeconds(5)); var defaultRevivePeriod = TimeSpan.FromHours(1); var defaultTimeout = TimeSpan.FromSeconds(60); var endpointExecutorConfig = new EndpointExecutorConfig(defaultTimeout, defaultRetryStrategy, defaultRevivePeriod, true);