Skip to content

Commit

Permalink
Fix: throw transient error when edgeHubCore is disconnected from the …
Browse files Browse the repository at this point in the history
…broker (Azure#4603)

so messages will be resent instead of dropping them
  • Loading branch information
vipeller authored and damonbarry committed Apr 14, 2022
1 parent 6544515 commit 6659f12
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand Down Expand Up @@ -32,7 +33,7 @@ public async Task<Try<ICloudConnection>> Connect(IIdentity identity, Action<stri
{
if (!await this.IsConnected())
{
return new Try<ICloudConnection>(new Exception("Bridge is not connected upstream"));
return new Try<ICloudConnection>(new IotHubException("Bridge is not connected upstream"));
}

var cloudProxy = new BrokeredCloudProxy(identity, this.cloudProxyDispatcher, connectionStatusChangedHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.CloudProxy;
using Microsoft.Azure.Devices.Edge.Hub.Core;
Expand Down Expand Up @@ -121,6 +122,43 @@ void Spy(RpcPacket packet)
}
}

[Fact]
public async Task WhenNoConnectionMessagesDontGetDropped()
{
// The motivation of this test was a bug, when throwing a bad exception type caused edgeHub to drop messages
// while edgeHubCore was disconnected from the MQTT broker. This test ensures that the message drop was
// due to the wrong exception type, and now that it is fixed, no message drop occures.
var (subscriptionChangeHandler, cloudProxyDispatcher, brokerConnector) = await SetupEnvironment();
var milestone = new SemaphoreSlim(0, 1);
var shouldReceiveNow = false;
var deviceId = "device_1";
var messageContent = "test message";

var edgeHub = cloudProxyDispatcher.AsPrivateAccessible().edgeHub as IEdgeHub;
brokerConnector.SetPacketSpy(Spy);

var identity = new DeviceIdentity(iotHubName, deviceId);
await edgeHub.ProcessDeviceMessage(identity, new EdgeMessage(Encoding.UTF8.GetBytes(messageContent), new Dictionary<string, string>(), new Dictionary<string, string>() { [Core.SystemProperties.ConnectionDeviceId] = deviceId } ));

// the bridge-connector keeps trying for 5 seconds, so let's wait a safe 10 seconds to be sure that the first attempt to send a message fails
await Task.Delay(TimeSpan.FromSeconds(10));

shouldReceiveNow = true;
await cloudProxyDispatcher.HandleAsync(new MqttPublishInfo("$internal/connectivity", Encoding.UTF8.GetBytes("{\"status\":\"Connected\"}")));

// the retry-config is to retry every 5 sec, so 10 should be enough
Assert.True(await milestone.WaitAsync(TimeSpan.FromSeconds(10)));

void Spy(RpcPacket packet)
{
Assert.True(shouldReceiveNow);
Assert.Equal("pub", packet.Cmd);
Assert.Equal(messageContent, Encoding.UTF8.GetString(packet.Payload));

milestone.Release();
}
}

async Task SetupSpyAndGenerateEvents(Action<RpcPacket> spy, IMessageConsumer cloudProxyDispatcher, NullBrokerConnector brokerConnector)
{
await cloudProxyDispatcher.HandleAsync(new MqttPublishInfo("$internal/connectivity", Encoding.UTF8.GetBytes("{\"status\":\"Connected\"}")));
Expand Down Expand Up @@ -198,7 +236,7 @@ async Task<HashSet<IIdentity>> ConnectClientsAsync(IMessageConsumer subscription
Routing.PerfCounter = NullRoutingPerfCounter.Instance;
Routing.UserAnalyticsLogger = NullUserAnalyticsLogger.Instance;

var defaultRetryStrategy = new FixedInterval(0, TimeSpan.FromSeconds(1));
var defaultRetryStrategy = new FixedInterval(5, TimeSpan.FromSeconds(5));
var defaultRevivePeriod = TimeSpan.FromHours(1);
var defaultTimeout = TimeSpan.FromSeconds(60);
var endpointExecutorConfig = new EndpointExecutorConfig(defaultTimeout, defaultRetryStrategy, defaultRevivePeriod, true);
Expand Down

0 comments on commit 6659f12

Please sign in to comment.