Skip to content

Update capabilities and log worker metadata after specialization #9020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@

**Release sprint:** Sprint 132
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+132%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+132%22+label%3Afeature+is%3Aclosed) ]
- Update App Service Authentication/Authorization on Linux Consumption from 1.4.17 to 1.5.1 [Release Note](https://github.com/Azure/app-service-announcements/issues/406)
- Update App Service Authentication/Authorization on Linux Consumption from 1.4.17 to 1.5.1 [Release Note](https://github.com/Azure/app-service-announcements/issues/406)
- Upgraded proto files to v1.7.1-protofile release (#9023)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not part of this PR. I forgot to include release_notes update in 9023.

- Refresh capabilities & log worker metadata after specialization (#9020)
35 changes: 25 additions & 10 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
private TaskCompletionSource<List<RawFunctionMetadata>> _functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1);
private bool _isSharedMemoryDataTransferEnabled;
private bool _cancelCapabilityEnabled;
private bool? _cancelCapabilityEnabled;
private bool _isWorkerApplicationInsightsLoggingEnabled;

private System.Timers.Timer _timer;
Expand Down Expand Up @@ -357,6 +357,11 @@ internal WorkerInitRequest GetWorkerInitRequest()
internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res, IDisposable latencyEvent)
{
_workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse from WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id);

LogWorkerMetadata(res.WorkerMetadata);
UpdateCapabilities(res.Capabilities);
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));

if (res.Result.IsFailure(out Exception reloadEnvironmentVariablesException))
{
_workerChannelLogger.LogError(reloadEnvironmentVariablesException, "Failed to reload environment variables");
Expand All @@ -375,13 +380,10 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
_initMessage = initEvent.Message.WorkerInitResponse;
_workerChannelLogger.LogDebug("Worker capabilities: {capabilities}", _initMessage.Capabilities);

if (_initMessage.WorkerMetadata != null)
{
_initMessage.UpdateWorkerMetadata(_workerConfig);
var workerMetadata = _initMessage.WorkerMetadata.ToString();
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadata);
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadata);
}
// In placeholder scenario, the capabilities and worker metadata will not be available
// until specialization is done (env reload request). So these can be removed from worker init response code path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check our understanding here with @soninaren as I remember change in flow on placeholder side for the work - selective loading of extensions.

// to do to track this: https://github.com/Azure/azure-functions-host/issues/9019
LogWorkerMetadata(_initMessage.WorkerMetadata);

if (_initMessage.Result.IsFailure(out Exception exc))
{
Expand All @@ -395,7 +397,7 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
UpdateCapabilities(_initMessage.Capabilities);

_isSharedMemoryDataTransferEnabled = IsSharedMemoryDataTransferEnabled();
_cancelCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the value of this capability is set to a false-y string? false, 0, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this may not be part of this PR but can we update the name _cancelCapabilityEnabled to may be _invocationCancelCapabilityEnabled or something on similar lines. Only cancel capability is not communicating the meaning till I saw RpcWorkerConstants.HandlesInvocationCancelMessage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, host code(in various places) checks the existence of the capability instead of the value being truthy. I agree, Ideally it should be a truthy check. but that might involve coordinating with workers to make sure they send the correct value.

Currently dotnet worker sends capability when the value is truthy. Ex: https://github.com/Azure/azure-functions-dotnet-worker/blob/14cc88859a1b7bbf68ee43409a1ad45a6e8362cb/src/DotNetWorker.Grpc/GrpcWorker.cs#L222-L229


if (!_isSharedMemoryDataTransferEnabled)
{
Expand All @@ -413,6 +415,19 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
_workerInitTask.TrySetResult(true);
}

private void LogWorkerMetadata(WorkerMetadata workerMetadata)
{
if (workerMetadata == null)
{
return;
}

workerMetadata.UpdateWorkerMetadata(_workerConfig);
var workerMetadataString = workerMetadata.ToString();
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadataString);
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be applicable for the scenario here but wanted to call out that for one of my previous PRs - refer, I got to know that adding log lines here has perf impact. So, we can double check if this log is really needed here and not affecting the perf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging was existing code, which I moved to a private method now. So this PR should not make a perf impact as such.

But I get your point. We can definitely discuss removing this logging entry as we have this information getting logged to functions metrics as well. Thoughts @liliankasem ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if a detector or other teams are using the information from the log somewhere so I can't say it's 100% safe to remove

}

// Allow tests to add capabilities, even if not directly supported by the worker.
internal virtual void UpdateCapabilities(IDictionary<string, string> fields)
{
Expand Down Expand Up @@ -668,7 +683,7 @@ await SendStreamingMessageAsync(new StreamingMessage
InvocationRequest = invocationRequest
});

if (_cancelCapabilityEnabled)
if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value)
{
context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public static Tuple<string, string, CookieOptions> RpcHttpCookieConverter(RpcHtt
return new Tuple<string, string, CookieOptions>(cookie.Name, cookie.Value, cookieOptions);
}

internal static void UpdateWorkerMetadata(this WorkerInitResponse initResponse, RpcWorkerConfig workerConfig)
internal static void UpdateWorkerMetadata(this WorkerMetadata workerMetadata, RpcWorkerConfig workerConfig)
{
initResponse.WorkerMetadata.RuntimeName = string.IsNullOrEmpty(initResponse.WorkerMetadata.RuntimeName)
? workerConfig.Description.Language : initResponse.WorkerMetadata.RuntimeName;
initResponse.WorkerMetadata.RuntimeVersion = string.IsNullOrEmpty(initResponse.WorkerMetadata.RuntimeVersion)
? workerConfig.Description.DefaultRuntimeVersion : initResponse.WorkerMetadata.RuntimeVersion;
workerMetadata.RuntimeName = string.IsNullOrEmpty(workerMetadata.RuntimeName)
? workerConfig.Description.Language : workerMetadata.RuntimeName;
workerMetadata.RuntimeVersion = string.IsNullOrEmpty(workerMetadata.RuntimeVersion)
? workerConfig.Description.DefaultRuntimeVersion : workerMetadata.RuntimeVersion;
}

private static SameSiteMode RpcSameSiteEnumConverter(RpcHttpCookie.Types.SameSite sameSite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ public async Task SendSendFunctionEnvironmentReloadRequest_PublishesOutboundEven
Environment.SetEnvironmentVariable("TestNull", null);
Environment.SetEnvironmentVariable("TestEmpty", string.Empty);
Environment.SetEnvironmentVariable("TestValid", "TestValue");
_testFunctionRpcService.AutoReply(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest);
_testFunctionRpcService.AutoReply(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest, workerSupportsSpecialization: true);
var pending = _workerChannel.SendFunctionEnvironmentReloadRequest();
await Task.Delay(500);
await pending; // this can timeout
Expand All @@ -718,6 +718,14 @@ public async Task SendSendFunctionEnvironmentReloadRequest_PublishesOutboundEven
ShowOutput(traces);
var functionLoadLogs = traces.Where(m => string.Equals(m.FormattedMessage, "Sending FunctionEnvironmentReloadRequest to WorkerProcess with Pid: '910'"));
Assert.Equal(1, functionLoadLogs.Count());

// for specialization use case, env reload response include worker metadata and capabilities.
var metatadataLog = traces.Where(m => string.Equals(m.FormattedMessage,
@"Worker metadata: { ""runtimeName"": "".NET"", ""runtimeVersion"": ""7.0"", ""workerVersion"": ""1.0.0"", ""workerBitness"": ""x64"" }"));
var capabilityUpdateLog = traces.Where(m => string.Equals(m.FormattedMessage,
@"Updating capabilities: { ""RpcHttpBodyOnly"": ""True"", ""TypedDataCollection"": ""True"" }"));
Assert.Single(metatadataLog);
Assert.Single(capabilityUpdateLog);
}

[Fact]
Expand Down
26 changes: 19 additions & 7 deletions test/WebJobs.Script.Tests/Workers/Rpc/TestFunctionRpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
{
Expand Down Expand Up @@ -41,12 +39,12 @@ public TestFunctionRpcService(IScriptEventManager eventManager, string workerId,
public void OnMessage(StreamingMessage.ContentOneofCase messageType, Action<OutboundGrpcEvent> callback)
=> _handlers.AddOrUpdate(messageType, callback, (messageType, oldValue) => oldValue + callback);

public void AutoReply(StreamingMessage.ContentOneofCase messageType)
public void AutoReply(StreamingMessage.ContentOneofCase messageType, bool workerSupportsSpecialization = false)
{
// apply standard default responses
Action<OutboundGrpcEvent> callback = messageType switch
{
StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest => _ => PublishFunctionEnvironmentReloadResponseEvent(),
StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest => _ => PublishFunctionEnvironmentReloadResponseEvent(workerSupportsSpecialization),
_ => null,
};
if (callback is not null)
Expand Down Expand Up @@ -178,13 +176,27 @@ public void PublishFunctionLoadResponsesEvent(List<string> functionIds, StatusRe
Write(responseMessage);
}

public void PublishFunctionEnvironmentReloadResponseEvent()
public void PublishFunctionEnvironmentReloadResponseEvent(bool workerSupportsSpecialization)
{
FunctionEnvironmentReloadResponse relaodEnvResponse = GetTestFunctionEnvReloadResponse();
FunctionEnvironmentReloadResponse reloadEnvResponse = GetTestFunctionEnvReloadResponse();
StreamingMessage responseMessage = new StreamingMessage()
{
FunctionEnvironmentReloadResponse = relaodEnvResponse
FunctionEnvironmentReloadResponse = reloadEnvResponse
};

if (workerSupportsSpecialization)
{
responseMessage.FunctionEnvironmentReloadResponse.WorkerMetadata = new()
{
RuntimeName = ".NET",
RuntimeVersion = "7.0",
WorkerVersion = "1.0.0",
WorkerBitness = "x64"
};
responseMessage.FunctionEnvironmentReloadResponse.Capabilities.Add("RpcHttpBodyOnly", bool.TrueString);
responseMessage.FunctionEnvironmentReloadResponse.Capabilities.Add("TypedDataCollection", bool.TrueString);
}

Write(responseMessage);
}

Expand Down