diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/brokerConnection/MqttBrokerConnector.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/brokerConnection/MqttBrokerConnector.cs index b6ee2f93623..fcc61e42084 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/brokerConnection/MqttBrokerConnector.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter/brokerConnection/MqttBrokerConnector.cs @@ -3,10 +3,12 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter { using System; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Concurrency; using Microsoft.Extensions.Logging; @@ -92,7 +94,7 @@ public async Task ConnectAsync(string serverAddress, int port) } Events.CouldNotConnect(); - throw new Exception("Failed to start MQTT broker connector"); + throw new EdgeHubConnectionException("Failed to start MQTT broker connector"); } client.ConnectionClosed += this.TriggerReconnect; @@ -166,7 +168,7 @@ await clientToStop.ForEachAsync( public async Task SendAsync(string topic, byte[] payload, bool retain = false) { - var client = this.mqttClient.Expect(() => new Exception("No mqtt-bridge connector instance found to send messages.")); + var client = this.mqttClient.Expect(() => new IOException("No mqtt-bridge connector instance found to send messages.")); var added = default(bool); var tcs = new TaskCompletionSource(); @@ -185,7 +187,7 @@ public async Task SendAsync(string topic, byte[] payload, bool retain = fa // if this happens it means that previously a message was sent out with the same message id but // then it wasn't deleted from the penging acks. that is either we went around with all the message ids // or some program error didn't delete it. not much to do either way. - new Exception("Could not store message id to monitor Mqtt ACK"); + new IOException("Could not store message id to monitor Mqtt ACK"); } bool result; @@ -295,6 +297,11 @@ void TriggerReconnect(object sender, EventArgs e) client.ConnectionClosed += this.TriggerReconnect; } + catch (Exception) + { + Events.NoMqttClientWhenReconnecting(); + return; + } finally { this.isRetrying.Set(false); @@ -549,7 +556,8 @@ enum EventIds MessageNotForwarded, FailedToForward, CouldNotConnect, - TimeoutReceivingSubAcks + TimeoutReceivingSubAcks, + NoMqttClientWhenReconnecting } public static void Starting() => Log.LogInformation((int)EventIds.Starting, "Starting mqtt-bridge connector"); @@ -573,6 +581,7 @@ enum EventIds public static void FailedToForwardDownstream(Exception e) => Log.LogError((int)EventIds.FailedToForwardDownstream, e, "Failed to forward message from downstream."); public static void CouldNotConnect() => Log.LogInformation((int)EventIds.CouldNotConnect, "Could not connect to MQTT Broker, possibly it is not running. To disable MQTT Broker Connector, please set 'mqttBrokerSettings__enabled' environment variable to 'false'"); public static void TimeoutReceivingSubAcks(Exception e) => Log.LogError((int)EventIds.TimeoutReceivingSubAcks, e, "MQTT Broker has not acknowledged subscriptions in time"); + public static void NoMqttClientWhenReconnecting() => Log.LogError((int)EventIds.NoMqttClientWhenReconnecting, "No Mqtt client instance when trying to reconnect - stopped trying."); } } }