Skip to content

Commit

Permalink
Restricting EdgeAgent parallel calls to edged to 5 (#5633) (#5720)
Browse files Browse the repository at this point in the history
When edge agent creates several modules, it may send too many requests parallel to edged, which led errors in the past
  • Loading branch information
lfitchett authored Oct 21, 2021
1 parent f12d7ca commit 2391cd9
Showing 1 changed file with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Edgelet

public class ModuleManagementHttpClient : IModuleManager, IIdentityManager, IDeviceManager
{
const int MaxConcurrentRequests = 5;

readonly ModuleManagementHttpClientVersioned inner;

readonly TimeSpan clientPermitTimeout = TimeSpan.FromSeconds(240);
readonly SemaphoreSlim clientPermit = new SemaphoreSlim(MaxConcurrentRequests);

public ModuleManagementHttpClient(Uri managementUri, string serverSupportedApiVersion, string clientSupportedApiVersion)
{
Preconditions.CheckNotNull(managementUri, nameof(managementUri));
Expand All @@ -25,43 +30,43 @@ public ModuleManagementHttpClient(Uri managementUri, string serverSupportedApiVe
this.inner = GetVersionedModuleManagement(managementUri, serverSupportedApiVersion, clientSupportedApiVersion);
}

public Task<Identity> CreateIdentityAsync(string name, string managedBy) => this.inner.CreateIdentityAsync(name, managedBy);
public Task<Identity> CreateIdentityAsync(string name, string managedBy) => this.Throttle(() => this.inner.CreateIdentityAsync(name, managedBy));

public Task<Identity> UpdateIdentityAsync(string name, string generationId, string managedBy) => this.inner.UpdateIdentityAsync(name, generationId, managedBy);
public Task<Identity> UpdateIdentityAsync(string name, string generationId, string managedBy) => this.Throttle(() => this.inner.UpdateIdentityAsync(name, generationId, managedBy));

public Task DeleteIdentityAsync(string name) => this.inner.DeleteIdentityAsync(name);
public Task DeleteIdentityAsync(string name) => this.Throttle(() => this.inner.DeleteIdentityAsync(name));

public Task<IEnumerable<Identity>> GetIdentities() => this.inner.GetIdentities();
public Task<IEnumerable<Identity>> GetIdentities() => this.Throttle(() => this.inner.GetIdentities());

public Task CreateModuleAsync(ModuleSpec moduleSpec) => this.inner.CreateModuleAsync(moduleSpec);
public Task CreateModuleAsync(ModuleSpec moduleSpec) => this.Throttle(() => this.inner.CreateModuleAsync(moduleSpec));

public Task StartModuleAsync(string name) => this.inner.StartModuleAsync(name);
public Task StartModuleAsync(string name) => this.Throttle(() => this.inner.StartModuleAsync(name));

public Task StopModuleAsync(string name) => this.inner.StopModuleAsync(name);
public Task StopModuleAsync(string name) => this.Throttle(() => this.inner.StopModuleAsync(name));

public Task DeleteModuleAsync(string name) => this.inner.DeleteModuleAsync(name);
public Task DeleteModuleAsync(string name) => this.Throttle(() => this.inner.DeleteModuleAsync(name));

public Task RestartModuleAsync(string name) => this.inner.RestartModuleAsync(name);
public Task RestartModuleAsync(string name) => this.Throttle(() => this.inner.RestartModuleAsync(name));

public Task UpdateModuleAsync(ModuleSpec moduleSpec) => this.inner.UpdateModuleAsync(moduleSpec);
public Task UpdateModuleAsync(ModuleSpec moduleSpec) => this.Throttle(() => this.inner.UpdateModuleAsync(moduleSpec));

public Task UpdateAndStartModuleAsync(ModuleSpec moduleSpec) => this.inner.UpdateAndStartModuleAsync(moduleSpec);
public Task UpdateAndStartModuleAsync(ModuleSpec moduleSpec) => this.Throttle(() => this.inner.UpdateAndStartModuleAsync(moduleSpec));

public Task<SystemInfo> GetSystemInfoAsync(CancellationToken cancellationToken) => this.inner.GetSystemInfoAsync(cancellationToken);
public Task<SystemInfo> GetSystemInfoAsync(CancellationToken cancellationToken) => this.Throttle(() => this.inner.GetSystemInfoAsync(cancellationToken));

public Task<SystemResources> GetSystemResourcesAsync() => this.inner.GetSystemResourcesAsync();
public Task<SystemResources> GetSystemResourcesAsync() => this.Throttle(() => this.inner.GetSystemResourcesAsync());

public Task<IEnumerable<ModuleRuntimeInfo>> GetModules<T>(CancellationToken token) => this.inner.GetModules<T>(token);
public Task<IEnumerable<ModuleRuntimeInfo>> GetModules<T>(CancellationToken token) => this.Throttle(() => this.inner.GetModules<T>(token));

public Task PrepareUpdateAsync(ModuleSpec moduleSpec) => this.inner.PrepareUpdateAsync(moduleSpec);
public Task PrepareUpdateAsync(ModuleSpec moduleSpec) => this.Throttle(() => this.inner.PrepareUpdateAsync(moduleSpec));

public Task ReprovisionDeviceAsync() => this.inner.ReprovisionDeviceAsync();
public Task ReprovisionDeviceAsync() => this.Throttle(() => this.inner.ReprovisionDeviceAsync());

public Task<Stream> GetModuleLogs(string name, bool follow, Option<int> tail, Option<string> since, Option<string> until, Option<bool> includeTimestamp, CancellationToken cancellationToken) =>
this.inner.GetModuleLogs(name, follow, tail, since, until, includeTimestamp, cancellationToken);
this.Throttle(() => this.inner.GetModuleLogs(name, follow, tail, since, until, includeTimestamp, cancellationToken));

public Task<Stream> GetSupportBundle(Option<string> since, Option<string> until, Option<string> iothubHostname, Option<bool> edgeRuntimeOnly, CancellationToken token) =>
this.inner.GetSupportBundle(since, until, iothubHostname, edgeRuntimeOnly, token);
this.Throttle(() => this.inner.GetSupportBundle(since, until, iothubHostname, edgeRuntimeOnly, token));

internal static ModuleManagementHttpClientVersioned GetVersionedModuleManagement(Uri managementUri, string serverSupportedApiVersion, string clientSupportedApiVersion)
{
Expand Down Expand Up @@ -112,5 +117,33 @@ static ApiVersion GetSupportedVersion(string serverSupportedApiVersion, string c

return serverVersion.Value < clientVersion.Value ? serverVersion : clientVersion;
}

Task Throttle(Func<Task> identityOperation)
{
return this.Throttle<bool>(
async () =>
{
await identityOperation();
return true;
});
}

async Task<T> Throttle<T>(Func<Task<T>> identityOperation)
{
bool permitAcquired = await this.clientPermit.WaitAsync(this.clientPermitTimeout);
if (!permitAcquired)
{
throw new TimeoutException($"Could not acquire permit to call ModuleManager, hit limit of {MaxConcurrentRequests} concurrent requests");
}

try
{
return await identityOperation();
}
finally
{
this.clientPermit.Release();
}
}
}
}

0 comments on commit 2391cd9

Please sign in to comment.