Skip to content

Commit

Permalink
Fixed exception type in BrokerConnection::SendAsync, (Azure#5091)
Browse files Browse the repository at this point in the history
so operation will be retried if no mqttclient exists.

The problem occurred when EdgeHub was shutting down but a message was being sent upstream. As the protocol head was stopped, that cleared the mqttclient client, so the send operation in SendAsync() threw an 'Exception' instance, which was not retriable by CloudEndpoint, so it just skipped the message.
  • Loading branch information
vipeller authored and damonbarry committed Apr 14, 2022
1 parent 464657a commit de0cec1
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -92,7 +94,7 @@ public async Task ConnectAsync(string serverAddress, int port)
}

Events.CouldNotConnect();
throw new Exception("Failed to start MQTT broker connector");
throw new EdgeHubConnectionException("Failed to start MQTT broker connector");
}

client.ConnectionClosed += this.TriggerReconnect;
Expand Down Expand Up @@ -166,7 +168,7 @@ await clientToStop.ForEachAsync(

public async Task<bool> SendAsync(string topic, byte[] payload, bool retain = false)
{
var client = this.mqttClient.Expect(() => new Exception("No mqtt-bridge connector instance found to send messages."));
var client = this.mqttClient.Expect(() => new IOException("No mqtt-bridge connector instance found to send messages."));

var added = default(bool);
var tcs = new TaskCompletionSource<bool>();
Expand All @@ -185,7 +187,7 @@ public async Task<bool> SendAsync(string topic, byte[] payload, bool retain = fa
// if this happens it means that previously a message was sent out with the same message id but
// then it wasn't deleted from the penging acks. that is either we went around with all the message ids
// or some program error didn't delete it. not much to do either way.
new Exception("Could not store message id to monitor Mqtt ACK");
new IOException("Could not store message id to monitor Mqtt ACK");
}

bool result;
Expand Down Expand Up @@ -295,6 +297,11 @@ void TriggerReconnect(object sender, EventArgs e)
client.ConnectionClosed += this.TriggerReconnect;
}
catch (Exception)
{
Events.NoMqttClientWhenReconnecting();
return;
}
finally
{
this.isRetrying.Set(false);
Expand Down Expand Up @@ -549,7 +556,8 @@ enum EventIds
MessageNotForwarded,
FailedToForward,
CouldNotConnect,
TimeoutReceivingSubAcks
TimeoutReceivingSubAcks,
NoMqttClientWhenReconnecting
}

public static void Starting() => Log.LogInformation((int)EventIds.Starting, "Starting mqtt-bridge connector");
Expand All @@ -573,6 +581,7 @@ enum EventIds
public static void FailedToForwardDownstream(Exception e) => Log.LogError((int)EventIds.FailedToForwardDownstream, e, "Failed to forward message from downstream.");
public static void CouldNotConnect() => Log.LogInformation((int)EventIds.CouldNotConnect, "Could not connect to MQTT Broker, possibly it is not running. To disable MQTT Broker Connector, please set 'mqttBrokerSettings__enabled' environment variable to 'false'");
public static void TimeoutReceivingSubAcks(Exception e) => Log.LogError((int)EventIds.TimeoutReceivingSubAcks, e, "MQTT Broker has not acknowledged subscriptions in time");
public static void NoMqttClientWhenReconnecting() => Log.LogError((int)EventIds.NoMqttClientWhenReconnecting, "No Mqtt client instance when trying to reconnect - stopped trying.");
}
}
}

0 comments on commit de0cec1

Please sign in to comment.