Skip to content

Commit

Permalink
Fix: Delay frequent twin pulls on reconnect (Azure#5148)
Browse files Browse the repository at this point in the history
Agent pulls twin on reconnect, so desired property changes will be learned if those happened while the device was disconnected. In certain circumstances it can happen that a device keeps connecting/disconnecting. One particular case is when two edge devices use the same identity and when one device connects, the other gets disconnected. In turn the disconnected device try to connect back and this makes the first device get disconnected. This fight over the connection can result in several twin pulls per seconds.

This solution delays subsequent twin pulls if those happen within a certain time window. The throttling applies only on connection, so if for some reason iothub sends frequent desired property changes, those are not throttled.

The solution below has a flaw: if the delay is e.g. 30 seconds (which is now and it is hardcoded), then if there was a pull 29 seconds ago and a new one comes in, that will be delayed by 30 seconds, giving an 59 second window. I was considering calculating the windows size dynamically, e.g. removing the time elapsed from the previous twin pull giving a more steady 30 sec delay windows, however weighting the frequency of the occurrence this protection is needed and the additional complexity of the code, I chose the simpler code.
  • Loading branch information
vipeller authored Jun 25, 2021
1 parent 244723e commit c87e85b
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using System.Linq;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core;
Expand All @@ -26,9 +27,12 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub

public class EdgeAgentConnection : IEdgeAgentConnection
{
const int PullFrequencyThreshold = 10;

internal static readonly Version ExpectedSchemaVersion = new Version("1.1.0");
static readonly TimeSpan DefaultConfigRefreshFrequency = TimeSpan.FromHours(1);
static readonly TimeSpan DeviceClientInitializationWaitTime = TimeSpan.FromSeconds(5);
static readonly TimeSpan DefaultTwinPullOnConnectThrottleTime = TimeSpan.FromSeconds(30);

static readonly ITransientErrorDetectionStrategy AllButFatalErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.IsFatal() == false);

Expand All @@ -45,19 +49,24 @@ public class EdgeAgentConnection : IEdgeAgentConnection
readonly IDeviceManager deviceManager;
readonly IDeploymentMetrics deploymentMetrics;
readonly Option<X509Certificate2> manifestTrustBundle;
readonly TimeSpan twinPullOnConnectThrottleTime;

Option<TwinCollection> desiredProperties;
Option<TwinCollection> reportedProperties;
Option<DeploymentConfigInfo> deploymentConfigInfo;

DateTime lastTwinPullOnConnect = DateTime.MinValue;
AtomicBoolean isDelayedTwinPullInProgress = new AtomicBoolean(false);
int pullRequestCounter = 0;

public EdgeAgentConnection(
IModuleClientProvider moduleClientProvider,
ISerde<DeploymentConfig> desiredPropertiesSerDe,
IRequestManager requestManager,
IDeviceManager deviceManager,
IDeploymentMetrics deploymentMetrics,
Option<X509Certificate2> manifestTrustBundle)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, true, DefaultConfigRefreshFrequency, TransientRetryStrategy, deploymentMetrics, manifestTrustBundle)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, true, DefaultConfigRefreshFrequency, TransientRetryStrategy, deploymentMetrics, manifestTrustBundle, DefaultTwinPullOnConnectThrottleTime)
{
}

Expand All @@ -70,7 +79,7 @@ public EdgeAgentConnection(
TimeSpan configRefreshFrequency,
IDeploymentMetrics deploymentMetrics,
Option<X509Certificate2> manifestTrustBundle)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, enableSubscriptions, configRefreshFrequency, TransientRetryStrategy, deploymentMetrics, manifestTrustBundle)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, enableSubscriptions, configRefreshFrequency, TransientRetryStrategy, deploymentMetrics, manifestTrustBundle, DefaultTwinPullOnConnectThrottleTime)
{
}

Expand All @@ -83,7 +92,8 @@ internal EdgeAgentConnection(
TimeSpan refreshConfigFrequency,
RetryStrategy retryStrategy,
IDeploymentMetrics deploymentMetrics,
Option<X509Certificate2> manifestTrustBundle)
Option<X509Certificate2> manifestTrustBundle,
TimeSpan twinPullOnConnectThrottleTime)
{
this.desiredPropertiesSerDe = Preconditions.CheckNotNull(desiredPropertiesSerDe, nameof(desiredPropertiesSerDe));
this.deploymentConfigInfo = Option.None<DeploymentConfigInfo>();
Expand All @@ -97,6 +107,7 @@ internal EdgeAgentConnection(
this.deploymentMetrics = Preconditions.CheckNotNull(deploymentMetrics, nameof(deploymentMetrics));
this.initTask = this.ForceRefreshTwin();
this.manifestTrustBundle = manifestTrustBundle;
this.twinPullOnConnectThrottleTime = twinPullOnConnectThrottleTime;
}

public Option<TwinCollection> ReportedProperties => this.reportedProperties;
Expand Down Expand Up @@ -238,9 +249,28 @@ async void OnConnectionStatusChanged(ConnectionStatus status, ConnectionStatusCh

if (this.pullOnReconnect && this.initTask.IsCompleted && status == ConnectionStatus.Connected)
{
var delayedTwinPull = true;
using (await this.twinLock.LockAsync())
{
await this.RefreshTwinAsync();
var now = DateTime.Now;
if (now - this.lastTwinPullOnConnect > this.twinPullOnConnectThrottleTime && !this.isDelayedTwinPullInProgress.Get())
{
this.lastTwinPullOnConnect = now;
await this.RefreshTwinAsync();
delayedTwinPull = false;
}
}

if (delayedTwinPull)
{
if (this.isDelayedTwinPullInProgress.GetAndSet(true))
{
Interlocked.Increment(ref this.pullRequestCounter);
}
else
{
_ = this.DelayedRefreshTwinAsync();
}
}
}
}
Expand All @@ -250,6 +280,38 @@ async void OnConnectionStatusChanged(ConnectionStatus status, ConnectionStatusCh
}
}

async Task DelayedRefreshTwinAsync()
{
Events.StartedDelayedTwinPull();
await Task.Delay(this.twinPullOnConnectThrottleTime);

var requestCounter = default(int);
using (await this.twinLock.LockAsync())
{
this.lastTwinPullOnConnect = DateTime.Now;

try
{
await this.RefreshTwinAsync();
}
catch
{
// swallowing intentionally
}

requestCounter = Interlocked.Exchange(ref this.pullRequestCounter, 0);

this.isDelayedTwinPullInProgress.Set(false);
}

if (requestCounter > PullFrequencyThreshold)
{
Events.PullingTwinHasBeenTriggeredFrequently(requestCounter, Convert.ToInt32(this.twinPullOnConnectThrottleTime.TotalSeconds));
}

Events.FinishedDelayedTwinPull();
}

async Task OnDesiredPropertiesUpdated(TwinCollection desiredPropertiesPatch, object userContext)
{
Events.DesiredPropertiesUpdated();
Expand Down Expand Up @@ -572,7 +634,10 @@ enum EventIds
ManifestSigningIsEnabled,
ManifestSigningIsNotEnabled,
ManifestTrustBundleIsNotConfigured,
DeploymentManifestIsNotSigned
DeploymentManifestIsNotSigned,
PullingTwinHasBeenTriggeredFrequently,
StartedDelayedTwinPull,
FinishedDelayedTwinPull
}

public static void DesiredPropertiesPatchFailed(Exception exception)
Expand Down Expand Up @@ -756,6 +821,21 @@ internal static void DeploymentManifestIsNotSigned()
{
Log.LogWarning((int)EventIds.DeploymentManifestIsNotSigned, $"Manifest Trust bundle is configured but the Deployment manifest is not signed. Please sign it.");
}

internal static void PullingTwinHasBeenTriggeredFrequently(int count, int seconds)
{
Log.LogWarning((int)EventIds.PullingTwinHasBeenTriggeredFrequently, $"Pulling twin by 'Connected' event has been triggered frequently, {count} times in the last {seconds} seconds. This can be a sign when more edge devices use the same identity and they keep getting disconnected.");
}

internal static void StartedDelayedTwinPull()
{
Log.LogDebug((int)EventIds.StartedDelayedTwinPull, $"Started delayed twin-pull");
}

internal static void FinishedDelayedTwinPull()
{
Log.LogDebug((int)EventIds.FinishedDelayedTwinPull, $"Finished delayed twin-pull");
}
}
}
}
Loading

0 comments on commit c87e85b

Please sign in to comment.