Skip to content

Commit

Permalink
Fix edgeHub shutdown for renew certificate (#6037)
Browse files Browse the repository at this point in the history
* fix shutdown and E2E test
  • Loading branch information
ancaantochi authored Feb 7, 2022
1 parent cfeea7d commit fcd4d00
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public async Task CloseAsync(CancellationToken token)

public void Dispose()
{
this.CloseAsync(CancellationToken.None).Wait();
}

void OnAcceptTransport(TransportListener transportListener, TransportAsyncCallbackArgs args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class MqttProtocolHead : IProtocolHead

IChannel serverChannel;
IEventLoopGroup eventLoopGroup;
IEventLoopGroup wsEventLoopGroup;
IEventLoopGroup parentEventLoopGroup;

public MqttProtocolHead(
ISettingsProvider settingsProvider,
Expand Down Expand Up @@ -106,13 +108,15 @@ public async Task CloseAsync(CancellationToken token)
{
try
{
this.logger.LogInformation("Stopping");
this.logger.LogInformation("Stopping MQTT protocol head");

await (this.serverChannel?.CloseAsync() ?? TaskEx.Done);
await (this.eventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
await (this.parentEventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
await (this.wsEventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
// TODO: gracefully shutdown the MultithreadEventLoopGroup in MqttWebSocketListener?
// TODO: this.webSocketListenerRegistry.TryUnregister("mqtts")?
this.logger.LogInformation("Stopped");
this.logger.LogInformation("Stopped MQTT protocol head");
}
catch (Exception ex)
{
Expand All @@ -123,7 +127,6 @@ public async Task CloseAsync(CancellationToken token)
public void Dispose()
{
this.mqttConnectionProvider.Dispose();
this.CloseAsync(CancellationToken.None).Wait();
}

ServerBootstrap SetupServerBootstrap()
Expand All @@ -138,11 +141,10 @@ ServerBootstrap SetupServerBootstrap()

var bootstrap = new ServerBootstrap();
// multithreaded event loop that handles the incoming connection
IEventLoopGroup parentEventLoopGroup = new MultithreadEventLoopGroup(parentEventLoopCount);
this.parentEventLoopGroup = new MultithreadEventLoopGroup(parentEventLoopCount);
// multithreaded event loop (worker) that handles the traffic of the accepted connections
this.eventLoopGroup = new MultithreadEventLoopGroup(threadCount);

bootstrap.Group(parentEventLoopGroup, this.eventLoopGroup)
bootstrap.Group(this.parentEventLoopGroup, this.eventLoopGroup)
.Option(ChannelOption.SoBacklog, listenBacklogSize)
// Allow listening socket to force bind to port if previous socket is still in TIME_WAIT
// Fixes "address is already in use" errors
Expand Down Expand Up @@ -185,14 +187,15 @@ ServerBootstrap SetupServerBootstrap()
bridgeFactory));
}));

this.wsEventLoopGroup = new MultithreadEventLoopGroup(Environment.ProcessorCount);
var mqttWebSocketListener = new MqttWebSocketListener(
settings,
bridgeFactory,
this.authenticator,
this.usernameParser,
this.clientCredentialsFactory,
() => this.sessionProvider,
new MultithreadEventLoopGroup(Environment.ProcessorCount),
this.wsEventLoopGroup,
this.byteBufferAllocator,
AutoRead,
maxInboundMessageSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ await hostToStop.Match(

public void Dispose()
{
this.DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public async Task CloseAsync(CancellationToken token)
Events.Closed();
}

public void Dispose() => this.CloseAsync(CancellationToken.None).Wait();
public void Dispose()
{
}

static class Events
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service

public class CertificateRenewal : IDisposable
{
static readonly TimeSpan MaxRenewAfter = TimeSpan.FromMilliseconds(int.MaxValue);
static readonly TimeSpan DefaultMaxRenewAfter = TimeSpan.FromMilliseconds(int.MaxValue);
static readonly TimeSpan TimeBuffer = TimeSpan.FromMinutes(5);

readonly TimeSpan maxRenewAfter;
readonly EdgeHubCertificates certificates;
readonly ILogger logger;
readonly Timer timer;
readonly CancellationTokenSource cts;

public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger)
public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger, TimeSpan maxRenewAfter)
{
this.certificates = Preconditions.CheckNotNull(certificates, nameof(certificates));
this.logger = Preconditions.CheckNotNull(logger, nameof(logger));
this.cts = new CancellationTokenSource();
this.maxRenewAfter = maxRenewAfter;

TimeSpan timeToExpire = certificates.ServerCertificate.NotAfter - DateTime.UtcNow;
if (timeToExpire > TimeBuffer)
Expand All @@ -29,8 +31,8 @@ public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger)
// This is the maximum value for the timer (~24 days)
// Math.Min unfortunately doesn't work with TimeSpans so we need to do the check manually
TimeSpan renewAfter = timeToExpire - (TimeBuffer / 2);
TimeSpan clamped = renewAfter > MaxRenewAfter
? MaxRenewAfter
TimeSpan clamped = renewAfter > this.maxRenewAfter
? this.maxRenewAfter
: renewAfter;
logger.LogInformation("Scheduling server certificate renewal for {0}.", DateTime.UtcNow.Add(renewAfter).ToString("o"));
logger.LogDebug("Scheduling server certificate renewal timer for {0} (clamped to Int32.MaxValue).", DateTime.UtcNow.Add(clamped).ToString("o"));
Expand Down Expand Up @@ -73,7 +75,7 @@ protected virtual void Dispose(bool disposing)
void Callback(object _state)
{
TimeSpan timeToExpire = this.certificates.ServerCertificate.NotAfter - DateTime.UtcNow;
if (timeToExpire > TimeBuffer)
if (timeToExpire > TimeBuffer && this.maxRenewAfter == DefaultMaxRenewAfter)
{
// Timer has expired but is not within the time window for renewal
// Reschedule the timer.
Expand All @@ -82,8 +84,8 @@ void Callback(object _state)
// This is the maximum value for the timer (~24 days)
// Math.Min unfortunately doesn't work with TimeSpans so we need to do the check manually
TimeSpan renewAfter = timeToExpire - (TimeBuffer / 2);
TimeSpan clamped = renewAfter > MaxRenewAfter
? MaxRenewAfter
TimeSpan clamped = renewAfter > this.maxRenewAfter
? this.maxRenewAfter
: renewAfter;
this.logger.LogDebug("Scheduling server certificate renewal timer for {0}.", DateTime.UtcNow.Add(clamped).ToString("o"));
this.timer.Change(clamped, Timeout.InfiniteTimeSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod));
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger);

double renewAfter = configuration.GetValue("ServerCertificateRenewAfterInMs", int.MaxValue);
renewAfter = renewAfter > int.MaxValue ? int.MaxValue : renewAfter;
TimeSpan maxRenewAfter = TimeSpan.FromMilliseconds(renewAfter);
using (IProtocolHead mqttBrokerProtocolHead = await GetMqttBrokerProtocolHeadAsync(experimentalFeatures, container))
using (IProtocolHead edgeHubProtocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting))
using (var renewal = new CertificateRenewal(certificates, logger))
using (var renewal = new CertificateRenewal(certificates, logger, maxRenewAfter))
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public async Task StartsUpAndServes()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -71,6 +73,7 @@ public async Task CannotStartTwice()
{
await sut.StartAsync();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await sut.StartAsync());
await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -90,6 +93,7 @@ public async Task DeniesNoPasswordNorCertificate()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);
await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -112,6 +116,8 @@ public async Task DeniesBothPasswordAndCertificate()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -132,6 +138,8 @@ public async Task DeniesBadCertificateFormat()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -152,6 +160,8 @@ public async Task DeniesNoVersion()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -173,6 +183,8 @@ public async Task DeniesBadVersion()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -199,6 +211,8 @@ public async Task AcceptsGoodTokenDeniesBadToken()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -224,6 +238,8 @@ public async Task AcceptsGoodThumbprintDeniesBadThumbprint()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down Expand Up @@ -254,6 +270,8 @@ public async Task AcceptsGoodCaDeniesBadCa()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -275,6 +293,8 @@ public async Task ReturnsDeviceIdentity()
var response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);
Assert.Equal("testhub/device", (string)response.identity);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -296,6 +316,8 @@ public async Task ReturnsModuleIdentity()
var response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);
Assert.Equal("testhub/device/module", (string)response.identity);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -310,6 +332,8 @@ public async Task AcceptsRequestWithContentLength()
var result = await SendDirectRequest(RequestBody);

Assert.StartsWith(@"{""result"":200,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -324,6 +348,8 @@ public async Task AcceptsRequestWithNoContentLength()
var result = await SendDirectRequest(RequestBody, withContentLength: false);

Assert.StartsWith(@"{""result"":200,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -338,6 +364,8 @@ public async Task DeniesMalformedJsonRequest()
var result = await SendDirectRequest(NonJSONRequestBody);

Assert.StartsWith(@"{""result"":403,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down Expand Up @@ -379,6 +407,8 @@ public async Task StoresMetadataCorrectly()
var modelId = (await metadataStore.GetMetadata("device")).ModelId;
Assert.True(modelId.HasValue);
Assert.Equal(modelIdString, modelId.GetOrElse("impossibleValue"));

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down
38 changes: 38 additions & 0 deletions test/Microsoft.Azure.Devices.Edge.Test/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,44 @@ public class Module : SasManualProvisioningFixture
const string SensorName = "tempSensor";
const string DefaultSensorImage = "mcr.microsoft.com/azureiotedge-simulated-temperature-sensor:1.0";

[Test]
[Category("CentOsSafe")]
public async Task CertRenew()
{
CancellationToken token = this.TestToken;

EdgeDeployment deployment = await this.runtime.DeployConfigurationAsync(
builder =>
{
builder.GetModule("$edgeHub").WithEnvironment(("ServerCertificateRenewAfterInMs", "6000"));
},
token,
Context.Current.NestedEdge);

EdgeModule edgeHub = deployment.Modules[ModuleName.EdgeHub];
await edgeHub.WaitForStatusAsync(EdgeModuleStatus.Running, token);
EdgeModule edgeAgent = deployment.Modules[ModuleName.EdgeAgent];
// certificate renew should stop edgeHub and then it should be started by edgeAgent
await edgeAgent.WaitForReportedPropertyUpdatesAsync(
new
{
properties = new
{
reported = new
{
systemModules = new
{
edgeHub = new
{
restartCount = 1
}
}
}
}
},
token);
}

[Test]
[Category("CentOsSafe")]
[Category("nestededge_isa95")]
Expand Down

0 comments on commit fcd4d00

Please sign in to comment.