Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
d5aa3bf
First set of changes for graceful shutdown of worker
surgupta-msft May 2, 2022
e97874e
Adding stop process async
surgupta-msft May 3, 2022
c269436
Adidng stopProcessAsync to send grpc message to worker
surgupta-msft May 3, 2022
b62d5d4
Adding StopWorkerProcess method
surgupta-msft May 3, 2022
3b87dc9
Removing cancellation token
surgupta-msft May 3, 2022
005b4f1
duplicate call removal
surgupta-msft May 3, 2022
677c51c
Adding in capapbilities
surgupta-msft May 13, 2022
05c3c54
Updating stopWorkerProcess method
surgupta-msft May 13, 2022
6725fc8
minor
surgupta-msft May 13, 2022
6885749
Correcting comment
surgupta-msft May 13, 2022
e0e20db
Tests
surgupta-msft May 13, 2022
86d743f
Grace period changes
surgupta-msft May 16, 2022
6e06cb7
Added unit test
surgupta-msft May 16, 2022
3e2236e
Investigating failing tests
surgupta-msft May 16, 2022
da847f9
Added Dispose tests in grpc channel
surgupta-msft May 16, 2022
02d7752
Improving tests
surgupta-msft May 16, 2022
8648a1a
Minor changes
surgupta-msft May 17, 2022
9fabebb
Merging with latest dev
surgupta-msft May 17, 2022
7b9d330
Resolving PR comments
surgupta-msft May 19, 2022
aec780c
Adding delay in a test
surgupta-msft May 19, 2022
e73076f
Added 10sec delay
surgupta-msft May 19, 2022
6d10a1b
Updating grace period to 10 sec
surgupta-msft May 23, 2022
dfe4e84
Code cleanup
surgupta-msft May 23, 2022
9d9012e
Removing blank line and unused libraries
surgupta-msft May 23, 2022
c55152f
Duplicate variable
surgupta-msft May 23, 2022
e5faf24
Merge branch 'dev' into surgupta/graceful-shutdown-worker
surgupta-msft May 23, 2022
2d48f97
Fixing typo
surgupta-msft May 24, 2022
bad68b6
Revert "Fixing typo"
surgupta-msft May 24, 2022
5dd1dbb
Improving wait for gracePeriod
surgupta-msft Jun 3, 2022
c8796c8
Merge branch 'dev' into surgupta/graceful-shutdown-worker
surgupta-msft Jun 15, 2022
7a1f75e
Adding unite test
surgupta-msft Jun 15, 2022
2236f9e
updated UT
surgupta-msft Jun 15, 2022
f921c56
Removing extra log line
surgupta-msft Jun 17, 2022
73b3a38
Merge branch 'dev' into surgupta/graceful-shutdown-worker
surgupta-msft Jun 21, 2022
246862b
Changes to handle exceptions and timeout
surgupta-msft Jun 21, 2022
879626b
Removing dispose from jobobjectregistry
surgupta-msft Jun 22, 2022
576b09f
Removing unused disposed variable
surgupta-msft Jun 22, 2022
68625d7
Changing log level from warning to info
surgupta-msft Jun 22, 2022
3a0d58c
minor change
surgupta-msft Jun 22, 2022
df9b50b
Adding StopProcess in IWorkerProcess
surgupta-msft Jun 24, 2022
31b48ca
Merge branch 'dev' into surgupta/graceful-shutdown-worker
surgupta-msft Jun 29, 2022
cbbd2c0
Added method WaitForProcessTermination()
surgupta-msft Jun 29, 2022
96dbae2
Wait Period in Seconds
surgupta-msft Jun 29, 2022
b8d767e
Fixing test failure
surgupta-msft Jun 29, 2022
38e7b32
Fixing test failures
surgupta-msft Jun 29, 2022
2ed34ef
Changes in workerprocess
surgupta-msft Jun 30, 2022
dd46133
CHanges in workerprocess and tests
surgupta-msft Jun 30, 2022
9da12db
Adding release notes
surgupta-msft Jun 30, 2022
eda9d27
Revert "CHanges in workerprocess and tests"
surgupta-msft Jul 1, 2022
b606e5b
Test changes for debugging
surgupta-msft Jul 6, 2022
d0f52cf
Merging with latest dev
surgupta-msft Jul 11, 2022
cce2cae
Adding capability
surgupta-msft Jul 11, 2022
7886492
Sequential stop and start
surgupta-msft Jul 11, 2022
0d9d953
Updating release notes and minor fixes
surgupta-msft Jul 11, 2022
004acb3
Minor fixes
surgupta-msft Jul 11, 2022
a3b1b39
Updating worker process
surgupta-msft Jul 12, 2022
762f0c0
Reseting worker init timeout
surgupta-msft Jul 12, 2022
b950549
Adding destructor in JobObjectRegistry
surgupta-msft Jul 12, 2022
fbaa461
Removing finally block from worker process
surgupta-msft Jul 12, 2022
b3518f8
Merging with Dev
surgupta-msft Jul 12, 2022
ffe8cfb
Updating release notes
surgupta-msft Jul 12, 2022
4a0b7f0
Removing extra log lines
surgupta-msft Jul 12, 2022
59c2a09
Checking process termination in JobObjectRegistry dispose
surgupta-msft Jul 14, 2022
5a79ab3
Naming updates
surgupta-msft Jul 15, 2022
1cd4f8b
Improving WaitForExit method
surgupta-msft Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
Expand Down Expand Up @@ -901,10 +902,36 @@ protected virtual void Dispose(bool disposing)

public void Dispose()
{
StopWorkerProcess();
_disposing = true;
Dispose(true);
}

private void StopWorkerProcess()
{
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerTerminateMessage));
if (!capabilityEnabled)
{
return;
}

int gracePeriod = WorkerConstants.WorkerTerminateGracePeriodInSeconds;

var workerTerminate = new WorkerTerminate()
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(gracePeriod))
};

_workerChannelLogger.LogDebug($"Sending WorkerTerminate message with grace period {gracePeriod} seconds.");

SendStreamingMessage(new StreamingMessage
{
WorkerTerminate = workerTerminate
});

WorkerProcess.WaitForProcessExitInMilliSeconds(gracePeriod * 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit here, but I don't feel we need to change the name, the method definition/summary should be enough to indicate if the parameter it takes is milliseconds or second

}

public async Task DrainInvocationsAsync()
{
_workerChannelLogger.LogDebug($"Count of in-buffer invocations waiting to be drained out: {_executingInvocations.Count}");
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script.WebHost/WebJobsScriptHostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
var currentHost = ActiveHost;
ActiveHost = null;
Task stopTask = Orphan(currentHost, cancellationToken);
Task result = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(10), cancellationToken));
Task result = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(10)));

if (result != stopTask)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ namespace Microsoft.Azure.WebJobs.Script.Workers
{
internal class EmptyProcessRegistry : IProcessRegistry
{
public bool Register(Process process) => true;
public bool Register(WorkerProcess process) => true;

public void Close()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Microsoft.Azure.WebJobs.Script.Workers
internal interface IProcessRegistry
{
// Registers processes to ensure that they are cleaned up on host exit.
bool Register(Process process);
bool Register(WorkerProcess process);

void Close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface IWorkerProcess
int Id { get; }

Task StartProcessAsync();

void WaitForProcessExitInMilliSeconds(int waitTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal class JobObjectRegistry : IProcessRegistry, IDisposable
{
private IntPtr _handle;
private bool _disposed = false;
private WorkerProcess _workerProcess = null;

public JobObjectRegistry()
{
Expand All @@ -37,9 +38,10 @@ public JobObjectRegistry()
}
}

public bool Register(Process proc)
public bool Register(WorkerProcess workerProcess)
{
return AssignProcessToJobObject(_handle, proc.Handle);
_workerProcess = workerProcess;
return AssignProcessToJobObject(_handle, _workerProcess.Process.Handle);
}

[DllImport("kernel32.dll", CharSet = CharSet.Unicode, SetLastError = true)]
Expand Down Expand Up @@ -78,6 +80,8 @@ private void Dispose(bool disposing)

public void Close()
{
_workerProcess?.ProcessWaitingForTermination.Task.GetAwaiter().GetResult();

if (_handle != IntPtr.Zero)
{
CloseHandle(_handle);
Expand Down
16 changes: 13 additions & 3 deletions src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -53,8 +54,9 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces

internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;

// for testing
internal Process Process { get; set; }
public Process Process { get; set; }

public TaskCompletionSource<bool> ProcessWaitingForTermination { get; set; } = new TaskCompletionSource<bool>();

internal abstract Process CreateWorkerProcess();

Expand All @@ -78,7 +80,7 @@ public Task StartProcessAsync()
Process.BeginOutputReadLine();

// Register process only after it starts
_processRegistry?.Register(Process);
_processRegistry?.Register(this);

RegisterWithProcessMonitor();

Expand Down Expand Up @@ -204,10 +206,18 @@ internal void BuildAndLogConsoleLog(string msg, LogLevel level)

internal abstract void HandleWorkerProcessRestart();

public void WaitForProcessExitInMilliSeconds(int waitTime)
{
Process.WaitForExit(waitTime);
}

public void Dispose()
{
Disposing = true;
// best effort process disposal

ProcessWaitingForTermination.SetResult(false);

try
{
_eventSubscription?.Dispose();
Expand Down
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static class RpcWorkerConstants
public const string AcceptsListOfFunctionLoadRequests = "AcceptsListOfFunctionLoadRequests";
public const string EnableUserCodeException = "EnableUserCodeException";
public const string SupportsLoadResponseCollection = "SupportsLoadResponseCollection";
public const string HandlesWorkerTerminateMessage = "HandlesWorkerTerminateMessage";

// Host Capabilities
public const string V2Compatable = "V2Compatable";
Expand Down
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/WorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public static class WorkerConstants
public const string HttpScheme = "http";

public const int WorkerReadyCheckPollingIntervalMilliseconds = 25;
public const int WorkerTerminateGracePeriodInSeconds = 5;
public const string WorkerConfigFileName = "worker.config.json";
public const string DefaultWorkersDirectoryName = "workers";

Expand Down
42 changes: 42 additions & 0 deletions test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,48 @@ await Assert.ThrowsAsync<TaskCanceledException>(async () =>
});
}

[Fact]
public void WorkerChannel_Dispose_With_WorkerTerminateCapability()
{
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);

IDictionary<string, string> capabilities = new Dictionary<string, string>()
{
{ RpcWorkerConstants.HandlesWorkerTerminateMessage, "1" }
};

StartStream startStream = new StartStream()
{
WorkerId = _workerId
};

StreamingMessage startStreamMessage = new StreamingMessage()
{
StartStream = startStream
};

// Send worker init request and enable the capabilities
GrpcEvent rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
_workerChannel.SendWorkerInitRequest(rpcEvent);
_testFunctionRpcService.PublishWorkerInitResponseEvent(capabilities);

_workerChannel.Dispose();
var traces = _logger.GetLogMessages();
var expectedLogMsg = $"Sending WorkerTerminate message with grace period {WorkerConstants.WorkerTerminateGracePeriodInSeconds} seconds.";
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, expectedLogMsg)));
}

[Fact]
public void WorkerChannel_Dispose_Without_WorkerTerminateCapability()
{
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);

_workerChannel.Dispose();
var traces = _logger.GetLogMessages();
var expectedLogMsg = $"Sending WorkerTerminate message with grace period {WorkerConstants.WorkerTerminateGracePeriodInSeconds} seconds.";
Assert.False(traces.Any(m => string.Equals(m.FormattedMessage, expectedLogMsg)));
}

[Fact]
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
{
Expand Down
21 changes: 20 additions & 1 deletion test/WebJobs.Script.Tests/Workers/Rpc/RpcWorkerProcessTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RpcWorkerProcessTests
private RpcWorkerProcess _rpcWorkerProcess;
private Mock<IScriptEventManager> _eventManager;
private Mock<IHostProcessMonitor> _hostProcessMonitorMock;
private TestLogger _logger = new TestLogger("test");

public RpcWorkerProcessTests()
{
Expand Down Expand Up @@ -45,7 +46,7 @@ public RpcWorkerProcessTests()
_eventManager.Object,
workerProcessFactory.Object,
processRegistry.Object,
new TestLogger("test"),
_logger,
languageWorkerConsoleLogSource.Object,
new TestMetricsLogger(),
serviceProviderMock.Object);
Expand Down Expand Up @@ -179,5 +180,23 @@ public void HandleWorkerProcessExitError_PublishesWorkerRestartEvent_OnIntention
_eventManager.Verify(_ => _.Publish(It.IsAny<WorkerRestartEvent>()), Times.Once());
_eventManager.Verify(_ => _.Publish(It.IsAny<WorkerErrorEvent>()), Times.Never());
}

[Fact]
public void WorkerProcess_Dispose()
{
Process process = new Process();
ProcessStartInfo startInfo = new ProcessStartInfo();
startInfo.WindowStyle = ProcessWindowStyle.Hidden;
startInfo.FileName = "cmd.exe";
startInfo.Arguments = $"ls";
process.StartInfo = startInfo;
process.Start();

_rpcWorkerProcess.Process = process;
_rpcWorkerProcess.Dispose();
var traces = _logger.GetLogMessages();
var disposeLogs = traces.Where(m => string.Equals(m.FormattedMessage, "Worker process has not exited despite waiting for 1000 ms"));
Assert.False(disposeLogs.Any());
}
}
}