Skip to content

Fix GrpcWorkerChannel.StartWorkerProcessAsync timeout #10937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

<Import Project="$(EngBuildRoot)Engineering.targets" />

<PropertyGroup>
<VSTestResultsDirectory>$(ArtifactsPath)/log/$(ArtifactsProjectName)/tests_$(ArtifactsPivots)/</VSTestResultsDirectory>
</PropertyGroup>

</Project>
4 changes: 2 additions & 2 deletions eng/ci/templates/jobs/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
displayName: Publish deps.json
path: $(Build.ArtifactStagingDirectory)
artifact: WebHost_Deps
condition: failed()
condition: and(failed(), eq(variables['System.JobAttempt'], 1)) # only publish on first attempt

steps:
- template: /eng/ci/templates/install-dotnet.yml@self
Expand All @@ -20,7 +20,7 @@ jobs:
inputs:
command: test
testRunTitle: Unit Tests
arguments: -v n
arguments: -v n --blame
projects: |
**\ExtensionsMetadataGeneratorTests.csproj
**\WebJobs.Script.Tests.csproj
Expand Down
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
-->
- Memory allocation optimizations in `ScriptStartupTypeLocator.GetExtensionsStartupTypesAsync` (#11012)
- Fix invocation timeout when incoming request contains "x-ms-invocation-id" header (#10980)
- Throw exception instead of timing out when worker channel exits before initializing gRPC (#10937)
- Warn if .azurefunctions folder does not exist (#10967)
- Memory allocation & CPU optimizations in `GrpcMessageExtensionUtilities.ConvertFromHttpMessageToExpando` (#11054)
29 changes: 23 additions & 6 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,36 @@ private void DispatchMessage(InboundGrpcEvent msg)

public bool IsChannelReadyForInvocations()
{
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
return !_disposing && !_disposed
&& _state.HasFlag(
RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
}

public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
{
RegisterCallbackForNextGrpcMessage(MsgType.StartStream, _workerConfig.CountOptions.ProcessStartupTimeout, 1, SendWorkerInitRequest, HandleWorkerStartStreamError);
// note: it is important that the ^^^ StartStream is in place *before* we start process the loop, otherwise we get a race condition
RegisterCallbackForNextGrpcMessage(
MsgType.StartStream,
_workerConfig.CountOptions.ProcessStartupTimeout,
count: 1,
SendWorkerInitRequest,
HandleWorkerStartStreamError);

// note: it is important that the ^^^ StartStream is in place *before* we start process the loop,
// otherwise we get a race condition
_ = ProcessInbound();

_workerChannelLogger.LogDebug("Initiating Worker Process start up");
await _rpcWorkerProcess.StartProcessAsync();
_state = _state | RpcWorkerChannelState.Initializing;
await _workerInitTask.Task;
await _rpcWorkerProcess.StartProcessAsync(cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important change - we will now wait on either worker fully initialized (gRPC connection established) or worker exits (in which case, we will re-throw any failures the worker experience).

_state |= RpcWorkerChannelState.Initializing;
Task exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken);
Task winner = await Task.WhenAny(_workerInitTask.Task, exited).WaitAsync(cancellationToken);
await winner;

if (winner == exited)
{
// process exited without throwing. We need to throw to indicate process is not running.
throw new WorkerProcessExitException("Worker process exited before initializing.");
}
}

public async Task<WorkerStatus> GetWorkerStatusAsync()
Expand Down
2 changes: 0 additions & 2 deletions src/WebJobs.Script/Config/ExtensionRequirementOptions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Script.ExtensionRequirements;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;

namespace Microsoft.Azure.WebJobs.Script.Config
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Configuration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,37 @@ public sealed class ScriptStartupTypeLocator : IWebJobsStartupTypeLocator
private readonly IExtensionBundleManager _extensionBundleManager;
private readonly IFunctionMetadataManager _functionMetadataManager;
private readonly IMetricsLogger _metricsLogger;
private readonly Lazy<IEnumerable<Type>> _startupTypes;
private readonly IEnvironment _environment;
private readonly IOptions<ExtensionRequirementOptions> _extensionRequirementOptions;
private readonly Lazy<IEnumerable<Type>> _startupTypes;
private static string[] _builtinExtensionAssemblies = GetBuiltinExtensionAssemblies();

public ScriptStartupTypeLocator(string rootScriptPath, ILogger<ScriptStartupTypeLocator> logger, IExtensionBundleManager extensionBundleManager,
IFunctionMetadataManager functionMetadataManager, IMetricsLogger metricsLogger, IOptions<ExtensionRequirementOptions> extensionRequirementOptions)
public ScriptStartupTypeLocator(
string rootScriptPath,
ILogger<ScriptStartupTypeLocator> logger,
IExtensionBundleManager extensionBundleManager,
IFunctionMetadataManager functionMetadataManager,
IMetricsLogger metricsLogger,
IEnvironment environment,
IOptions<ExtensionRequirementOptions> extensionRequirementOptions)
{
_rootScriptPath = rootScriptPath ?? throw new ArgumentNullException(nameof(rootScriptPath));
_extensionBundleManager = extensionBundleManager ?? throw new ArgumentNullException(nameof(extensionBundleManager));
_logger = logger;
_functionMetadataManager = functionMetadataManager;
_metricsLogger = metricsLogger;
_startupTypes = new Lazy<IEnumerable<Type>>(() => GetExtensionsStartupTypesAsync().ConfigureAwait(false).GetAwaiter().GetResult());
_environment = environment;
_extensionRequirementOptions = extensionRequirementOptions;
_startupTypes = new Lazy<IEnumerable<Type>>(() => GetExtensionsStartupTypesAsync().ConfigureAwait(false).GetAwaiter().GetResult());
}

private static string[] GetBuiltinExtensionAssemblies()
{
return new[]
{
return
[
typeof(WebJobs.Extensions.Http.HttpWebJobsStartup).Assembly.GetName().Name,
typeof(WebJobs.Extensions.ExtensionsWebJobsStartup).Assembly.GetName().Name
};
];
}

public Type[] GetStartupTypes()
Expand Down Expand Up @@ -102,11 +110,11 @@ public async Task<IEnumerable<Type>> GetExtensionsStartupTypesAsync()
}
}

bool isDotnetIsolatedApp = Utility.IsDotnetIsolatedApp(SystemEnvironment.Instance, functionMetadataCollection);
bool isDotnetIsolatedApp = Utility.IsDotnetIsolatedApp(_environment, functionMetadataCollection);
bool isDotnetApp = isPrecompiledFunctionApp || isDotnetIsolatedApp;
var isLogicApp = SystemEnvironment.Instance.IsLogicApp();
var isLogicApp = _environment.IsLogicApp();

if (SystemEnvironment.Instance.IsPlaceholderModeEnabled())
if (_environment.IsPlaceholderModeEnabled())
{
// Do not move this.
// Calling this log statement in the placeholder mode to avoid jitting during specialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,19 @@ private static IOpenTelemetryBuilder ConfigureMetrics(this IOpenTelemetryBuilder
return builder.WithMetrics(builder =>
{
builder.AddAspNetCoreInstrumentation()
.AddRuntimeInstrumentation()
.AddProcessInstrumentation()
.AddMeter(HostMetrics.FaasMeterName)
.AddView(HostMetrics.FaasInvokeDuration, new ExplicitBucketHistogramConfiguration
{
Boundaries = new double[] { 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10 }
});
.AddRuntimeInstrumentation()
.AddProcessInstrumentation()
.AddMeter(HostMetrics.FaasMeterName)
.AddView(HostMetrics.FaasInvokeDuration, new ExplicitBucketHistogramConfiguration
{
Boundaries = new double[] { 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10 }
});

if (enableOtlp)
{
builder.AddOtlpExporter();
}

if (enableAzureMonitor)
{
builder.AddAzureMonitorMetricExporter(opt => ConfigureAzureMonitorOptions(opt, azMonConnectionString, credential));
Expand Down
21 changes: 16 additions & 5 deletions src/WebJobs.Script/Extensions/ExceptionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Reflection;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Azure.WebJobs.Host.Diagnostics;
Expand Down Expand Up @@ -32,12 +33,22 @@ or SEHException

public static string ToFormattedString(this Exception exception)
{
if (exception == null)
ArgumentNullException.ThrowIfNull(exception);
return ExceptionFormatter.GetFormattedException(exception);
}

public static void ThrowIfErrorsPresent(IList<Exception> exceptions, string message = null)
{
switch (exceptions)
{
throw new ArgumentNullException(nameof(exception));
case null or []:
return;
case [Exception e]:
ExceptionDispatchInfo.Capture(e).Throw();
return;
default:
throw new AggregateException(message, exceptions);
}

return ExceptionFormatter.GetFormattedException(exception);
}
}
}
4 changes: 2 additions & 2 deletions src/WebJobs.Script/Host/IWorkerFunctionMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
namespace Microsoft.Azure.WebJobs.Script
{
/// <summary>
/// Defines an interface for fetching function metadata from Out-of-Proc language workers
/// Defines an interface for fetching function metadata from Out-of-Proc language workers.
/// </summary>
internal interface IWorkerFunctionMetadataProvider
{
ImmutableDictionary<string, ImmutableArray<string>> FunctionErrors { get; }

/// <summary>
/// Attempts to get function metadata from Out-of-Proc language workers
/// Attempts to get function metadata from Out-of-Proc language workers.
/// </summary>
/// <returns>FunctionMetadataResult that either contains the function metadata or indicates that a fall back option for fetching metadata should be used</returns>
Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<RpcWorkerConfig> workerConfigs, bool forceRefresh = false);
Expand Down
6 changes: 3 additions & 3 deletions src/WebJobs.Script/Host/ScriptHostState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Microsoft.Azure.WebJobs.Script
public enum ScriptHostState
{
/// <summary>
/// The host has not yet been created
/// The host has not yet been created.
/// </summary>
Default,

Expand All @@ -28,7 +28,7 @@ public enum ScriptHostState
Running,

/// <summary>
/// The host is in an error state
/// The host is in an error state.
/// </summary>
Error,

Expand All @@ -43,7 +43,7 @@ public enum ScriptHostState
Stopped,

/// <summary>
/// The host is offline
/// The host is offline.
/// </summary>
Offline
}
Expand Down
9 changes: 7 additions & 2 deletions src/WebJobs.Script/Host/WorkerFunctionMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
// forceRefresh will be false when bundle is not used (dotnet and dotnet-isolated).
if (!_environment.IsPlaceholderModeEnabled() && forceRefresh && !_scriptOptions.CurrentValue.IsFileSystemReadOnly)
{
_channelManager.ShutdownChannelsAsync().GetAwaiter().GetResult();
await _channelManager.ShutdownChannelsAsync();
}

var channels = _channelManager.GetChannels(_workerRuntime);
Expand Down Expand Up @@ -108,6 +108,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
throw new InvalidOperationException($"No initialized language worker channel found for runtime: {_workerRuntime}.");
}

List<Exception> errors = null;
foreach (string workerId in channels.Keys.ToList())
{
if (channels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> languageWorkerChannelTask))
Expand All @@ -130,7 +131,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
}

_functions = functions.ToImmutableArray();
_logger.FunctionsReturnedByProvider(_functions.IsDefault ? 0 : _functions.Count(), _metadataProviderName);
_logger.FunctionsReturnedByProvider(_functions.Length, _metadataProviderName);

// Validate if the app has functions in legacy format and add in logs to inform about the mixed app
_ = Task.Delay(TimeSpan.FromMinutes(1)).ContinueWith(t => ValidateFunctionAppFormat(_scriptOptions.CurrentValue.ScriptPath, _logger, _environment));
Expand All @@ -141,9 +142,13 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
{
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
await _channelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
errors ??= [];
errors.Add(ex);
}
}
}

ExceptionExtensions.ThrowIfErrorsPresent(errors, "Errors getting function metadata from workers.");
}

return new FunctionMetadataResult(useDefaultMetadataIndexing: false, _functions);
Expand Down
9 changes: 8 additions & 1 deletion src/WebJobs.Script/ScriptHostBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,14 @@ public static IHostBuilder AddScriptHost(this IHostBuilder builder,
var bundleManager = new ExtensionBundleManager(extensionBundleOptions, SystemEnvironment.Instance, loggerFactory, configOption);
var metadataServiceManager = applicationOptions.RootServiceProvider.GetService<IFunctionMetadataManager>();

var locator = new ScriptStartupTypeLocator(applicationOptions.ScriptPath, loggerFactory.CreateLogger<ScriptStartupTypeLocator>(), bundleManager, metadataServiceManager, metricsLogger, extensionRequirementOptions);
var locator = new ScriptStartupTypeLocator(
applicationOptions.ScriptPath,
loggerFactory.CreateLogger<ScriptStartupTypeLocator>(),
bundleManager,
metadataServiceManager,
metricsLogger,
SystemEnvironment.Instance,
extensionRequirementOptions);

// The locator (and thus the bundle manager) need to be created now in order to configure app configuration.
// Store them so they do not need to be re-created later when configuring services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Scale;

namespace Microsoft.Azure.WebJobs.Script.Workers
{
Expand All @@ -13,7 +13,9 @@ public interface IWorkerProcess

Process Process { get; }

Task StartProcessAsync();
Task StartProcessAsync(CancellationToken cancellationToken = default);

Task WaitForExitAsync(CancellationToken cancellationToken = default);

void WaitForProcessExitInMilliSeconds(int waitTime);
}
Expand Down
Loading