Skip to content

Commit 09a9316

Browse files
chore: move flag checking logic to interceptor
1 parent 76676ca commit 09a9316

File tree

4 files changed

+13
-77
lines changed

4 files changed

+13
-77
lines changed

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { instrument, extractContextFromHeaders, headersWithContext } from '../in
2222
import { ContextManager } from './context-manager';
2323
import { SpanName, SPAN_DELIMITER } from './definitions';
2424
import { SpanExporter } from './span-exporter';
25-
import { ensureWorkflowModuleLoaded, getWorkflowModule } from './workflow-module-loader';
25+
import { ensureWorkflowModuleLoaded, getSdkFlagsChecking, getWorkflowModule } from './workflow-module-loader';
2626

2727
export * from './definitions';
2828

@@ -77,12 +77,14 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
7777
input: SignalInput,
7878
next: Next<WorkflowInboundCallsInterceptor, 'handleSignal'>
7979
): Promise<void> {
80+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
81+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield);
8082
const context = extractContextFromHeaders(input.headers);
8183
return await instrument({
8284
tracer: this.tracer,
8385
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
8486
fn: () => next(input),
85-
context,
87+
context: shouldInjectYield ? await Promise.resolve(context) : context,
8688
});
8789
}
8890
}

packages/interceptors-opentelemetry/src/workflow/workflow-module-loader.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
* @module
44
*/
55
import type * as WorkflowModule from '@temporalio/workflow';
6+
import type { SdkFlags as SdkFlagsT } from '@temporalio/workflow/lib/flags';
7+
import type { getActivator as getActivatorT } from '@temporalio/workflow/lib/global-attributes';
68

79
// @temporalio/workflow is an optional peer dependency.
810
// It can be missing as long as the user isn't attempting to construct a workflow interceptor.
@@ -30,6 +32,12 @@ export function getWorkflowModule(): typeof WorkflowModule {
3032
return workflowModule!;
3133
}
3234

35+
export function getSdkFlagsChecking(): { getActivator: typeof getActivatorT; SdkFlags: typeof SdkFlagsT } {
36+
const { SdkFlags } = require('@temporalio/workflow/lib/flags');
37+
const { getActivator } = require('@temporalio/workflow/lib/global-attributes');
38+
return { getActivator, SdkFlags };
39+
}
40+
3341
/**
3442
* Checks if the workflow module loaded successfully and throws if not.
3543
*/

packages/test/src/workflows/inbound-signal.ts

Whitespace-only changes.

packages/workflow/src/internals.ts

Lines changed: 1 addition & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -980,11 +980,7 @@ export class Activator implements ActivationHandler {
980980

981981
const signalExecutionNum = this.signalHandlerExecutionSeq++;
982982
this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy });
983-
const execute = composeInterceptors(
984-
this.maybeInjectYieldForOtelHandler(interceptors),
985-
'handleSignal',
986-
this.signalWorkflowNextHandler.bind(this)
987-
);
983+
const execute = composeInterceptors(interceptors, 'handleSignal', this.signalWorkflowNextHandler.bind(this));
988984
execute({
989985
args: arrayFromPayloads(this.payloadConverter, activation.input),
990986
signalName,
@@ -1261,31 +1257,6 @@ export class Activator implements ActivationHandler {
12611257
failureToError(failure: ProtoFailure): Error {
12621258
return this.failureConverter.failureToError(failure, this.payloadConverter);
12631259
}
1264-
1265-
private maybeInjectYieldForOtelHandler(
1266-
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1267-
): NonNullable<WorkflowInterceptors['inbound']> {
1268-
if (!this.hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield)) {
1269-
return [...interceptors];
1270-
}
1271-
const otelInboundInterceptorIndex = findOpenTelemetryInboundInterceptor(interceptors);
1272-
if (otelInboundInterceptorIndex === null) {
1273-
return [...interceptors];
1274-
}
1275-
// A handler that only serves the insert a yield point in the interceptor handlers
1276-
const yieldHandleSignalInterceptor: NonNullable<WorkflowInterceptors['inbound']>[number] = {
1277-
handleSignal: async (input, next) => {
1278-
await Promise.resolve();
1279-
return next(input);
1280-
},
1281-
};
1282-
// Insert the yield handler before the OTEL one to synthesize the yield point added in the affected versions of the handler
1283-
return [
1284-
...interceptors.slice(0, otelInboundInterceptorIndex),
1285-
yieldHandleSignalInterceptor,
1286-
...interceptors.slice(otelInboundInterceptorIndex),
1287-
];
1288-
}
12891260
}
12901261

12911262
function getSeq<T extends { seq?: number | null }>(activation: T): number {
@@ -1336,48 +1307,3 @@ then you can disable this warning by passing an option when setting the handler:
13361307
Array.from(names.entries()).map(([name, count]) => ({ name, count }))
13371308
)}`;
13381309
}
1339-
1340-
// Should only get run on replay
1341-
function shouldInjectYield(version?: string): boolean {
1342-
if (!version) {
1343-
return false;
1344-
}
1345-
const [major, minor, patchAndTags] = version.split('.', 3);
1346-
// 1.11.5 - 1.13.1: need to inject
1347-
if (major !== '1') return false;
1348-
1349-
// patch might have some extra stuff that needs cleaning
1350-
// basically "takeWhile digit"
1351-
let patch;
1352-
try {
1353-
const patchDigits = /[0-9]+/.exec(patchAndTags)?.[0];
1354-
patch = patchDigits ? Number.parseInt(patchDigits) : null;
1355-
} catch {
1356-
patch = null;
1357-
}
1358-
1359-
switch (minor) {
1360-
case '11':
1361-
// 1.11.3 was the last release that didn't inject a yield point
1362-
return Boolean(patch && patch > 3);
1363-
case '12':
1364-
// Every 1.12 release requires a yield
1365-
return true;
1366-
case '13':
1367-
// 1.13.2 will be the first release since 1.11.3 that doesn't have a yield point in `handleSignal`
1368-
return Boolean(patch && patch < 2);
1369-
default:
1370-
return false;
1371-
}
1372-
}
1373-
1374-
function findOpenTelemetryInboundInterceptor(
1375-
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1376-
): number | null {
1377-
const index = interceptors.findIndex(
1378-
(interceptor) =>
1379-
// We use a marker instead of `instanceof` to avoid taking a dependency on @temporalio/interceptors-opentelemetry
1380-
(interceptor as NonNullable<WorkflowInterceptors['inbound']> & { maybeInjectYield: boolean }).maybeInjectYield
1381-
);
1382-
return index !== -1 ? index : null;
1383-
}

0 commit comments

Comments
 (0)