Skip to content

Commit afafa87

Browse files
authored
Update capabilities and log worker metadata after specialization (#9020)
* Update capabilities and log worker metadata in env reload request. * Updated tests * Release notes * Updated release notes to include proto upgrade change
1 parent 11a0add commit afafa87

File tree

5 files changed

+61
-24
lines changed

5 files changed

+61
-24
lines changed

release_notes.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@
1616

1717
**Release sprint:** Sprint 132
1818
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+132%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+132%22+label%3Afeature+is%3Aclosed) ]
19-
- Update App Service Authentication/Authorization on Linux Consumption from 1.4.17 to 1.5.1 [Release Note](https://github.com/Azure/app-service-announcements/issues/406)
19+
- Update App Service Authentication/Authorization on Linux Consumption from 1.4.17 to 1.5.1 [Release Note](https://github.com/Azure/app-service-announcements/issues/406)
20+
- Upgraded proto files to v1.7.1-protofile release (#9023)
21+
- Refresh capabilities & log worker metadata after specialization (#9020)

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
7878
private TaskCompletionSource<List<RawFunctionMetadata>> _functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
7979
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1);
8080
private bool _isSharedMemoryDataTransferEnabled;
81-
private bool _cancelCapabilityEnabled;
81+
private bool? _cancelCapabilityEnabled;
8282
private bool _isWorkerApplicationInsightsLoggingEnabled;
8383

8484
private System.Timers.Timer _timer;
@@ -357,6 +357,11 @@ internal WorkerInitRequest GetWorkerInitRequest()
357357
internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res, IDisposable latencyEvent)
358358
{
359359
_workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse from WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id);
360+
361+
LogWorkerMetadata(res.WorkerMetadata);
362+
UpdateCapabilities(res.Capabilities);
363+
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
364+
360365
if (res.Result.IsFailure(out Exception reloadEnvironmentVariablesException))
361366
{
362367
_workerChannelLogger.LogError(reloadEnvironmentVariablesException, "Failed to reload environment variables");
@@ -375,13 +380,10 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
375380
_initMessage = initEvent.Message.WorkerInitResponse;
376381
_workerChannelLogger.LogDebug("Worker capabilities: {capabilities}", _initMessage.Capabilities);
377382

378-
if (_initMessage.WorkerMetadata != null)
379-
{
380-
_initMessage.UpdateWorkerMetadata(_workerConfig);
381-
var workerMetadata = _initMessage.WorkerMetadata.ToString();
382-
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadata);
383-
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadata);
384-
}
383+
// In placeholder scenario, the capabilities and worker metadata will not be available
384+
// until specialization is done (env reload request). So these can be removed from worker init response code path.
385+
// to do to track this: https://github.com/Azure/azure-functions-host/issues/9019
386+
LogWorkerMetadata(_initMessage.WorkerMetadata);
385387

386388
if (_initMessage.Result.IsFailure(out Exception exc))
387389
{
@@ -395,7 +397,7 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
395397
UpdateCapabilities(_initMessage.Capabilities);
396398

397399
_isSharedMemoryDataTransferEnabled = IsSharedMemoryDataTransferEnabled();
398-
_cancelCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
400+
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
399401

400402
if (!_isSharedMemoryDataTransferEnabled)
401403
{
@@ -413,6 +415,19 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
413415
_workerInitTask.TrySetResult(true);
414416
}
415417

418+
private void LogWorkerMetadata(WorkerMetadata workerMetadata)
419+
{
420+
if (workerMetadata == null)
421+
{
422+
return;
423+
}
424+
425+
workerMetadata.UpdateWorkerMetadata(_workerConfig);
426+
var workerMetadataString = workerMetadata.ToString();
427+
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadataString);
428+
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString);
429+
}
430+
416431
// Allow tests to add capabilities, even if not directly supported by the worker.
417432
internal virtual void UpdateCapabilities(IDictionary<string, string> fields)
418433
{
@@ -668,7 +683,7 @@ await SendStreamingMessageAsync(new StreamingMessage
668683
InvocationRequest = invocationRequest
669684
});
670685

671-
if (_cancelCapabilityEnabled)
686+
if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value)
672687
{
673688
context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
674689
}

src/WebJobs.Script.Grpc/MessageExtensions/GrpcMessageExtensionUtilities.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ public static Tuple<string, string, CookieOptions> RpcHttpCookieConverter(RpcHtt
7878
return new Tuple<string, string, CookieOptions>(cookie.Name, cookie.Value, cookieOptions);
7979
}
8080

81-
internal static void UpdateWorkerMetadata(this WorkerInitResponse initResponse, RpcWorkerConfig workerConfig)
81+
internal static void UpdateWorkerMetadata(this WorkerMetadata workerMetadata, RpcWorkerConfig workerConfig)
8282
{
83-
initResponse.WorkerMetadata.RuntimeName = string.IsNullOrEmpty(initResponse.WorkerMetadata.RuntimeName)
84-
? workerConfig.Description.Language : initResponse.WorkerMetadata.RuntimeName;
85-
initResponse.WorkerMetadata.RuntimeVersion = string.IsNullOrEmpty(initResponse.WorkerMetadata.RuntimeVersion)
86-
? workerConfig.Description.DefaultRuntimeVersion : initResponse.WorkerMetadata.RuntimeVersion;
83+
workerMetadata.RuntimeName = string.IsNullOrEmpty(workerMetadata.RuntimeName)
84+
? workerConfig.Description.Language : workerMetadata.RuntimeName;
85+
workerMetadata.RuntimeVersion = string.IsNullOrEmpty(workerMetadata.RuntimeVersion)
86+
? workerConfig.Description.DefaultRuntimeVersion : workerMetadata.RuntimeVersion;
8787
}
8888

8989
private static SameSiteMode RpcSameSiteEnumConverter(RpcHttpCookie.Types.SameSite sameSite)

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ public async Task SendSendFunctionEnvironmentReloadRequest_PublishesOutboundEven
702702
Environment.SetEnvironmentVariable("TestNull", null);
703703
Environment.SetEnvironmentVariable("TestEmpty", string.Empty);
704704
Environment.SetEnvironmentVariable("TestValid", "TestValue");
705-
_testFunctionRpcService.AutoReply(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest);
705+
_testFunctionRpcService.AutoReply(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest, workerSupportsSpecialization: true);
706706
var pending = _workerChannel.SendFunctionEnvironmentReloadRequest();
707707
await Task.Delay(500);
708708
await pending; // this can timeout
@@ -718,6 +718,14 @@ public async Task SendSendFunctionEnvironmentReloadRequest_PublishesOutboundEven
718718
ShowOutput(traces);
719719
var functionLoadLogs = traces.Where(m => string.Equals(m.FormattedMessage, "Sending FunctionEnvironmentReloadRequest to WorkerProcess with Pid: '910'"));
720720
Assert.Equal(1, functionLoadLogs.Count());
721+
722+
// for specialization use case, env reload response include worker metadata and capabilities.
723+
var metatadataLog = traces.Where(m => string.Equals(m.FormattedMessage,
724+
@"Worker metadata: { ""runtimeName"": "".NET"", ""runtimeVersion"": ""7.0"", ""workerVersion"": ""1.0.0"", ""workerBitness"": ""x64"" }"));
725+
var capabilityUpdateLog = traces.Where(m => string.Equals(m.FormattedMessage,
726+
@"Updating capabilities: { ""RpcHttpBodyOnly"": ""True"", ""TypedDataCollection"": ""True"" }"));
727+
Assert.Single(metatadataLog);
728+
Assert.Single(capabilityUpdateLog);
721729
}
722730

723731
[Fact]

test/WebJobs.Script.Tests/Workers/Rpc/TestFunctionRpcService.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
using Microsoft.Azure.WebJobs.Script.Eventing;
1212
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
1313
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
14-
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
1514
using Microsoft.Extensions.Logging;
16-
using Newtonsoft.Json.Linq;
1715

1816
namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
1917
{
@@ -41,12 +39,12 @@ public TestFunctionRpcService(IScriptEventManager eventManager, string workerId,
4139
public void OnMessage(StreamingMessage.ContentOneofCase messageType, Action<OutboundGrpcEvent> callback)
4240
=> _handlers.AddOrUpdate(messageType, callback, (messageType, oldValue) => oldValue + callback);
4341

44-
public void AutoReply(StreamingMessage.ContentOneofCase messageType)
42+
public void AutoReply(StreamingMessage.ContentOneofCase messageType, bool workerSupportsSpecialization = false)
4543
{
4644
// apply standard default responses
4745
Action<OutboundGrpcEvent> callback = messageType switch
4846
{
49-
StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest => _ => PublishFunctionEnvironmentReloadResponseEvent(),
47+
StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest => _ => PublishFunctionEnvironmentReloadResponseEvent(workerSupportsSpecialization),
5048
_ => null,
5149
};
5250
if (callback is not null)
@@ -178,13 +176,27 @@ public void PublishFunctionLoadResponsesEvent(List<string> functionIds, StatusRe
178176
Write(responseMessage);
179177
}
180178

181-
public void PublishFunctionEnvironmentReloadResponseEvent()
179+
public void PublishFunctionEnvironmentReloadResponseEvent(bool workerSupportsSpecialization)
182180
{
183-
FunctionEnvironmentReloadResponse relaodEnvResponse = GetTestFunctionEnvReloadResponse();
181+
FunctionEnvironmentReloadResponse reloadEnvResponse = GetTestFunctionEnvReloadResponse();
184182
StreamingMessage responseMessage = new StreamingMessage()
185183
{
186-
FunctionEnvironmentReloadResponse = relaodEnvResponse
184+
FunctionEnvironmentReloadResponse = reloadEnvResponse
187185
};
186+
187+
if (workerSupportsSpecialization)
188+
{
189+
responseMessage.FunctionEnvironmentReloadResponse.WorkerMetadata = new()
190+
{
191+
RuntimeName = ".NET",
192+
RuntimeVersion = "7.0",
193+
WorkerVersion = "1.0.0",
194+
WorkerBitness = "x64"
195+
};
196+
responseMessage.FunctionEnvironmentReloadResponse.Capabilities.Add("RpcHttpBodyOnly", bool.TrueString);
197+
responseMessage.FunctionEnvironmentReloadResponse.Capabilities.Add("TypedDataCollection", bool.TrueString);
198+
}
199+
188200
Write(responseMessage);
189201
}
190202

0 commit comments

Comments
 (0)