Skip to content
3 changes: 3 additions & 0 deletions src/WorkerChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class WorkerChannel {
#preInvocationHooks: HookCallback[] = [];
#postInvocationHooks: HookCallback[] = [];
#appStartHooks: HookCallback[] = [];
#appTerminateHooks: HookCallback[] = [];
functions: { [id: string]: RegisteredFunction } = {};
hasIndexedFunctions = false;

Expand Down Expand Up @@ -112,6 +113,8 @@ export class WorkerChannel {
return this.#postInvocationHooks;
case 'appStart':
return this.#appStartHooks;
case 'appTerminate':
return this.#appTerminateHooks;
default:
throw new RangeError(`Unrecognized hook "${hookName}"`);
}
Expand Down
1 change: 1 addition & 0 deletions src/eventHandlers/WorkerInitHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'worker
UseNullableValueDictionaryForHttp: 'true',
WorkerStatus: 'true',
TypedDataCollection: 'true',
HandlesWorkerTerminateMessage: 'true',
};

return response;
Expand Down
37 changes: 37 additions & 0 deletions src/eventHandlers/terminateWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AppTerminateContext } from '@azure/functions-core';
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { ReadOnlyError } from '../utils/ReadOnlyError';
import { WorkerChannel } from '../WorkerChannel';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

export async function terminateWorker(channel: WorkerChannel, _msg: rpc.IWorkerTerminate) {
channel.log({
message: 'Received workerTerminate message; gracefully shutting down worker',
level: LogLevel.Debug,
logCategory: LogCategory.System,
});

const appTerminateContext: AppTerminateContext = {
get hookData() {
return channel.appLevelOnlyHookData;
},
set hookData(_obj) {
throw new ReadOnlyError('hookData');
},
get appHookData() {
return channel.appHookData;
},
set appHookData(_obj) {
throw new ReadOnlyError('appHookData');
},
};

await channel.executeHooks('appTerminate', appTerminateContext);

channel.eventStream.end();
process.exit(0);
}
7 changes: 6 additions & 1 deletion src/setupEventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { FunctionEnvironmentReloadHandler } from './eventHandlers/FunctionEnviro
import { FunctionLoadHandler } from './eventHandlers/FunctionLoadHandler';
import { FunctionsMetadataHandler } from './eventHandlers/FunctionsMetadataHandler';
import { InvocationHandler } from './eventHandlers/InvocationHandler';
import { terminateWorker } from './eventHandlers/terminateWorker';
import { WorkerInitHandler } from './eventHandlers/WorkerInitHandler';
import { ensureErrorType } from './utils/ensureErrorType';
import { InternalException } from './utils/InternalException';
Expand Down Expand Up @@ -66,6 +67,11 @@ async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rp
case 'workerInitRequest':
eventHandler = new WorkerInitHandler();
break;
case 'workerTerminate':
// Worker terminate request is a special request which gracefully shuts down worker
// It doesn't have a response so we don't have an EventHandler class for it
await terminateWorker(channel, nonNullProp(inMsg, eventName));
return;
case 'workerStatusRequest':
// Worker sends the host empty response to evaluate the worker's latency
// The response doesn't even allow a `result` property, which is why we don't implement an EventHandler class
Expand All @@ -81,7 +87,6 @@ async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rp
case 'invocationCancel':
case 'startStream':
case 'workerHeartbeat':
case 'workerTerminate':
// Not yet implemented
return;
default:
Expand Down
113 changes: 113 additions & 0 deletions test/eventHandlers/InvocationHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { LegacyFunctionLoader } from '../../src/LegacyFunctionLoader';
import { WorkerChannel } from '../../src/WorkerChannel';
import { Msg as AppStartMsg } from '../startApp.test';
import { beforeEventHandlerSuite } from './beforeEventHandlerSuite';
import { Msg as WorkerTerminateMsg } from './terminateWorker.test';
import { TestEventStream } from './TestEventStream';
import { Msg as WorkerInitMsg } from './WorkerInitHandler.test';
import LogCategory = rpc.RpcLog.RpcLogCategory;
Expand Down Expand Up @@ -333,13 +334,15 @@ describe('InvocationHandler', () => {
let channel: WorkerChannel;
let coreApi: typeof coreTypes;
let testDisposables: coreTypes.Disposable[] = [];
let processExitStub: sinon.SinonStub;

before(async () => {
const result = beforeEventHandlerSuite();
stream = result.stream;
loader = <TestFunctionLoader>result.loader;
channel = result.channel;
coreApi = await import('@azure/functions-core');
processExitStub = sinon.stub(process, 'exit');
});

beforeEach(async () => {
Expand All @@ -354,6 +357,10 @@ describe('InvocationHandler', () => {
testDisposables = [];
});

after(() => {
processExitStub.restore();
});

function sendInvokeMessage(inputData?: rpc.IParameterBinding[] | null): void {
stream.addTestMessage({
requestId: 'testReqId',
Expand Down Expand Up @@ -924,6 +931,57 @@ describe('InvocationHandler', () => {
expect(hookData).to.equal('appStartpreInvocpostInvoc');
});

it('appHookData changes from invocation hooks are persisted in app terminate hook contexts', async () => {
const expectedAppHookData = {
hello: 'world',
test: {
test2: 3,
},
};

loader.getFunction.returns({ callback: async () => {}, metadata: Binding.queue });

testDisposables.push(
coreApi.registerHook('preInvocation', (context: coreTypes.PreInvocationContext) => {
Object.assign(context.appHookData, expectedAppHookData);
hookData += 'preInvoc';
})
);

testDisposables.push(
coreApi.registerHook('postInvocation', (context: coreTypes.PostInvocationContext) => {
expect(context.appHookData).to.deep.equal(expectedAppHookData);
hookData += 'postInvoc';
})
);

sendInvokeMessage([InputData.http]);
await stream.assertCalledWith(
Msg.receivedInvocLog(),
Msg.executingHooksLog(1, 'preInvocation'),
Msg.executedHooksLog('preInvocation'),
Msg.executingHooksLog(1, 'postInvocation'),
Msg.executedHooksLog('postInvocation'),
Msg.invocResponse([])
);

const terminateFunc = sinon.spy((context: coreTypes.AppTerminateContext) => {
expect(context.appHookData).to.deep.equal(expectedAppHookData);
hookData += 'appTerminate';
});
testDisposables.push(coreApi.registerHook('appTerminate', terminateFunc));

stream.addTestMessage(WorkerTerminateMsg.workerTerminate());

await stream.assertCalledWith(
WorkerTerminateMsg.receivedWorkerTerminateLog,
AppStartMsg.executingHooksLog(1, 'appTerminate'),
AppStartMsg.executedHooksLog('appTerminate')
);
expect(terminateFunc.callCount).to.be.equal(1);
expect(hookData).to.equal('preInvocpostInvocappTerminate');
});

it('hookData changes from appStart hooks are not persisted in invocation hook contexts', async () => {
const functionAppDirectory = __dirname;
const startFunc = sinon.spy((context: coreTypes.AppStartContext) => {
Expand Down Expand Up @@ -982,6 +1040,61 @@ describe('InvocationHandler', () => {
expect(hookData).to.equal('appStartpreInvocpostInvoc');
});

it('hookData changes from invocation hooks are not persisted in app terminate contexts', async () => {
const expectedAppHookData = {
hello: 'world',
test: {
test2: 3,
},
};

loader.getFunction.returns({ callback: async () => {}, metadata: Binding.queue });

testDisposables.push(
coreApi.registerHook('preInvocation', (context: coreTypes.PreInvocationContext) => {
Object.assign(context.hookData, expectedAppHookData);
expect(context.appHookData).to.be.empty;
hookData += 'preInvoc';
})
);

testDisposables.push(
coreApi.registerHook('postInvocation', (context: coreTypes.PostInvocationContext) => {
expect(context.hookData).to.deep.equal(expectedAppHookData);
expect(context.appHookData).to.be.empty;
hookData += 'postInvoc';
})
);

sendInvokeMessage([InputData.http]);
await stream.assertCalledWith(
Msg.receivedInvocLog(),
Msg.executingHooksLog(1, 'preInvocation'),
Msg.executedHooksLog('preInvocation'),
Msg.executingHooksLog(1, 'postInvocation'),
Msg.executedHooksLog('postInvocation'),
Msg.invocResponse([])
);

const terminateFunc = sinon.spy((context: coreTypes.AppTerminateContext) => {
expect(context.appHookData).to.be.empty;
expect(context.hookData).to.be.empty;
hookData += 'appTerminate';
});
testDisposables.push(coreApi.registerHook('appTerminate', terminateFunc));

stream.addTestMessage(WorkerTerminateMsg.workerTerminate());

await stream.assertCalledWith(
WorkerTerminateMsg.receivedWorkerTerminateLog,
AppStartMsg.executingHooksLog(1, 'appTerminate'),
AppStartMsg.executedHooksLog('appTerminate')
);

expect(terminateFunc.callCount).to.be.equal(1);
expect(hookData).to.equal('preInvocpostInvocappTerminate');
});

it('appHookData changes are persisted between invocation-level hooks', async () => {
const expectedAppHookData = {
hello: 'world',
Expand Down
1 change: 1 addition & 0 deletions test/eventHandlers/WorkerInitHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export namespace Msg {
UseNullableValueDictionaryForHttp: 'true',
WorkerStatus: 'true',
TypedDataCollection: 'true',
HandlesWorkerTerminateMessage: 'true',
},
result: {
status: rpc.StatusResult.Status.Success,
Expand Down
Loading