Skip to content

Commit 4731405

Browse files
clean up tests and flags
1 parent c4a15ce commit 4731405

File tree

5 files changed

+103
-107
lines changed

5 files changed

+103
-107
lines changed

packages/interceptors-opentelemetry/src/instrumentation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export function extractContextFromHeaders(headers: Headers): otel.Context | unde
2727
/**
2828
* Given headers, return new headers with the current otel context inserted
2929
*/
30-
export async function headersWithContext(headers: Headers): Promise<Headers> {
30+
export function headersWithContext(headers: Headers): Headers {
3131
const carrier = {};
3232
otel.propagation.inject(otel.context.active(), carrier, otel.defaultTextMapSetter);
3333
return { ...headers, [TRACE_HEADER]: payloadConverter.toPayload(carrier) };

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
6161
): Promise<unknown> {
6262
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
6363
const context = extractContextFromHeaders(input.headers);
64+
if (shouldInjectYield) await Promise.resolve();
6465
return await instrument({
6566
tracer: this.tracer,
6667
spanName: `${SpanName.WORKFLOW_EXECUTE}${SPAN_DELIMITER}${workflowInfo().workflowType}`,
6768
fn: () => next(input),
68-
context: shouldInjectYield ? await Promise.resolve(context) : context,
69+
context,
6970
acceptableErrors: (err) => err instanceof ContinueAsNew,
7071
});
7172
}
@@ -76,11 +77,12 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
7677
): Promise<void> {
7778
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield);
7879
const context = extractContextFromHeaders(input.headers);
80+
if (shouldInjectYield) await Promise.resolve();
7981
return await instrument({
8082
tracer: this.tracer,
8183
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
8284
fn: () => next(input),
83-
context: shouldInjectYield ? await Promise.resolve(context) : context,
85+
context,
8486
});
8587
}
8688
}
@@ -97,11 +99,13 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
9799
input: ActivityInput,
98100
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleActivity'>
99101
): Promise<unknown> {
102+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
100103
return await instrument({
101104
tracer: this.tracer,
102105
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
103106
fn: async () => {
104-
const headers = await headersWithContext(input.headers);
107+
const headers = headersWithContext(input.headers);
108+
if (shouldInjectYield) await Promise.resolve();
105109
return next({
106110
...input,
107111
headers,
@@ -114,11 +118,13 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
114118
input: LocalActivityInput,
115119
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleLocalActivity'>
116120
): Promise<unknown> {
121+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryScheduleLocalActivityInterceptorInsertYield);
117122
return await instrument({
118123
tracer: this.tracer,
119124
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
120125
fn: async () => {
121-
const headers = await headersWithContext(input.headers);
126+
const headers = headersWithContext(input.headers);
127+
if (shouldInjectYield) await Promise.resolve();
122128
return next({
123129
...input,
124130
headers,
@@ -131,11 +137,13 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
131137
input: StartChildWorkflowExecutionInput,
132138
next: Next<WorkflowOutboundCallsInterceptor, 'startChildWorkflowExecution'>
133139
): Promise<[Promise<string>, Promise<unknown>]> {
140+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
134141
return await instrument({
135142
tracer: this.tracer,
136143
spanName: `${SpanName.CHILD_WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
137144
fn: async () => {
138-
const headers = await headersWithContext(input.headers);
145+
const headers = headersWithContext(input.headers);
146+
if (shouldInjectYield) await Promise.resolve();
139147
return next({
140148
...input,
141149
headers,
@@ -148,11 +156,13 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
148156
input: ContinueAsNewInput,
149157
next: Next<WorkflowOutboundCallsInterceptor, 'continueAsNew'>
150158
): Promise<never> {
159+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
151160
return await instrument({
152161
tracer: this.tracer,
153162
spanName: `${SpanName.CONTINUE_AS_NEW}${SPAN_DELIMITER}${input.options.workflowType}`,
154163
fn: async () => {
155-
const headers = await headersWithContext(input.headers);
164+
const headers = headersWithContext(input.headers);
165+
if (shouldInjectYield) await Promise.resolve();
156166
return next({
157167
...input,
158168
headers,
@@ -166,11 +176,13 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
166176
input: SignalWorkflowInput,
167177
next: Next<WorkflowOutboundCallsInterceptor, 'signalWorkflow'>
168178
): Promise<void> {
179+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
169180
return await instrument({
170181
tracer: this.tracer,
171182
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
172183
fn: async () => {
173-
const headers = await headersWithContext(input.headers);
184+
const headers = headersWithContext(input.headers);
185+
if (shouldInjectYield) await Promise.resolve();
174186
return next({
175187
...input,
176188
headers,

packages/test/src/test-flags.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ function composeConditions(conditions: Conditions): NonNullable<Conditions>[numb
1616

1717
test('OpenTelemetryHandleSignalInterceptorInsertYield enabled by version', (t) => {
1818
const cases = [
19+
{ version: undefined, expected: false },
1920
{ version: '1.0.0', expected: false },
2021
{ version: '1.11.3', expected: false },
2122
{ version: '1.11.5', expected: true },
@@ -40,6 +41,8 @@ test('OpenTelemetryHandleSignalInterceptorInsertYield enabled by version', (t) =
4041

4142
test('OpenTelemetryInterceptorInsertYield enabled by version', (t) => {
4243
const cases = [
44+
// If there isn't any SDK version available we enable this flag as these yields were present since the initial version of the OTEL interceptors
45+
{ version: undefined, expected: true },
4346
{ version: '0.1.0', expected: true },
4447
{ version: '1.0.0', expected: true },
4548
{ version: '1.9.0-rc.0', expected: true },
@@ -57,3 +60,30 @@ test('OpenTelemetryInterceptorInsertYield enabled by version', (t) => {
5760
t.is(actual, expected, `Expected OpenTelemetryInterceptorInsertYield on ${version} to evaluate as ${expected}`);
5861
}
5962
});
63+
64+
test('OpenTelemetryScheduleLocalActivityInterceptorInsertYield enabled by version', (t) => {
65+
const cases = [
66+
{ version: undefined, expected: false },
67+
{ version: '1.0.0', expected: false },
68+
{ version: '1.11.3', expected: false },
69+
{ version: '1.11.5', expected: false },
70+
{ version: '1.11.6', expected: true },
71+
{ version: '1.12.0', expected: true },
72+
{ version: '1.13.1', expected: true },
73+
{ version: '1.13.2', expected: false },
74+
{ version: '1.14.0', expected: false },
75+
];
76+
for (const { version, expected } of cases) {
77+
const actual = composeConditions(
78+
SdkFlags.OpenTelemetryScheduleLocalActivityInterceptorInsertYield.alternativeConditions
79+
)({
80+
info: {} as WorkflowInfo,
81+
sdkVersion: version,
82+
});
83+
t.is(
84+
actual,
85+
expected,
86+
`Expected OpenTelemetryScheduleLocalActivityInterceptorInsertYield on ${version} to evaluate as ${expected}`
87+
);
88+
}
89+
});

packages/test/src/test-otel.ts

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
import { OpenTelemetrySinks, SpanName, SPAN_DELIMITER } from '@temporalio/interceptors-opentelemetry/lib/workflow';
2525
import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker';
2626
import * as activities from './activities';
27-
import { loadHistory, RUN_INTEGRATION_TESTS, saveHistory, TestWorkflowEnvironment, Worker } from './helpers';
27+
import { loadHistory, RUN_INTEGRATION_TESTS, TestWorkflowEnvironment, Worker } from './helpers';
2828
import * as workflows from './workflows';
2929
import { createTestWorkflowBundle } from './helpers-integration';
3030

@@ -559,40 +559,7 @@ test('Can replay otel history from 1.13.1', async (t) => {
559559
});
560560

561561
test('Can replay smorgasbord from 1.13.1', async (t) => {
562-
/*
563-
const staticResource = new opentelemetry.resources.Resource({
564-
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
565-
});
566-
const worker = await Worker.create({
567-
workflowsPath: require.resolve('./workflows'),
568-
activities,
569-
taskQueue: 'test-otel-inbound',
570-
sinks: {
571-
exporter: makeWorkflowExporter(new InMemorySpanExporter(), staticResource),
572-
},
573-
interceptors: {
574-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
575-
activity: [
576-
(ctx) => ({
577-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
578-
}),
579-
],
580-
},
581-
});
582-
const client = new WorkflowClient();
583-
584-
const result = await worker.runUntil(async () => {
585-
const handle = await client.start(workflows.smorgasbord, {
586-
taskQueue: 'test-otel-inbound',
587-
workflowId: uuid4(),
588-
});
589-
const result = await handle.result();
590-
const history = await handle.fetchHistory();
591-
await saveHistory('smorg_with_otel.json', history);
592-
return result;
593-
});
594-
*/
595-
562+
// This test will trigger NDE if yield points for `scheduleActivity` and `startChildWorkflowExecution` are not inserted
596563
const hist = await loadHistory('otel_smorgasbord_1_13_1.json');
597564
await t.notThrowsAsync(async () => {
598565
await Worker.runReplayHistory(
@@ -616,41 +583,6 @@ test('Can replay smorgasbord from 1.13.1', async (t) => {
616583
});
617584

618585
test('Can replay signal workflow from 1.13.1', async (t) => {
619-
/*
620-
const staticResource = new opentelemetry.resources.Resource({
621-
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
622-
});
623-
const worker = await Worker.create({
624-
workflowsPath: require.resolve('./workflows/signal-workflow'),
625-
activities: [],
626-
taskQueue: 'test-otel-inbound',
627-
sinks: {
628-
exporter: makeWorkflowExporter(new InMemorySpanExporter(), staticResource),
629-
},
630-
interceptors: {
631-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
632-
activity: [
633-
(ctx) => ({
634-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
635-
}),
636-
],
637-
},
638-
});
639-
const client = new WorkflowClient();
640-
641-
const result = await worker.runUntil(async () => {
642-
const handle = await client.start(workflows.topSecretGreeting, {
643-
args: ['Temporal'],
644-
taskQueue: 'test-otel-inbound',
645-
workflowId: uuid4(),
646-
});
647-
const result = await handle.result();
648-
const history = await handle.fetchHistory();
649-
await saveHistory('signal_workflow_1_13_1.json', history);
650-
return result;
651-
});
652-
*/
653-
654586
const hist = await loadHistory('signal_workflow_1_13_1.json');
655587
await t.notThrowsAsync(async () => {
656588
await Worker.runReplayHistory(
@@ -671,5 +603,4 @@ test('Can replay signal workflow from 1.13.1', async (t) => {
671603
hist
672604
);
673605
});
674-
t.pass();
675606
});

packages/workflow/src/flags.ts

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,25 @@ export const SdkFlags = {
6363
* The interceptors provided by @temporalio/interceptors-opentelemetry initially had unnecessary yield points.
6464
* If replaying a workflow created from these versions a yield point is injected to prevent any NDE.
6565
*
66+
* If the history does not include the SDK version, default to enabled since the yields were present since the OTEL
67+
* package was created.
68+
*
69+
* @since Introduced in 1.13.2
70+
*/
71+
OpenTelemetryInterceptorInsertYield: defineFlag(3, false, [isBefore({ major: 1, minor: 13, patch: 2 }, true)]),
72+
/**
73+
* In 1.11.6, the `scheduleLocalActivity` interceptor was added to
74+
* `@temporalio/interceptors-opentelemetry` which added a yield point to the
75+
* outbound interceptor. This yield point was removed in 1.13.2.
76+
*
77+
* If replaying a workflow from 1.11.6 up to 1.13.1, we insert a yield point
78+
* in the interceptor to match the behavior.
79+
*
6680
* @since Introduced in 1.13.2
6781
*/
68-
OpenTelemetryInterceptorInsertYield: defineFlag(3, false, [isBefore({ major: 1, minor: 13, patch: 2 })]),
82+
OpenTelemetryScheduleLocalActivityInterceptorInsertYield: defineFlag(4, false, [
83+
isBetween({ major: 1, minor: 11, patch: 5 }, { major: 1, minor: 13, patch: 2 }),
84+
]),
6985
} as const;
7086

7187
function defineFlag(id: number, def: boolean, alternativeConditions?: AltConditionFn[]): SdkFlag {
@@ -103,6 +119,40 @@ type SemVer = {
103119
patch: number;
104120
};
105121

122+
/**
123+
* Creates an `AltConditionFn` that checks if the SDK version is before the provided version.
124+
* An optional default can be provided in case the SDK version is not available.
125+
*/
126+
function isBefore(compare: SemVer, missingDefault?: boolean): AltConditionFn {
127+
return isCompared(compare, 1, missingDefault);
128+
}
129+
130+
/**
131+
* Creates an `AltConditionFn` that checks if the SDK version is after the provided version.
132+
* An optional default can be provided in case the SDK version is not available.
133+
*/
134+
function isAfter(compare: SemVer, missingDefault?: boolean): AltConditionFn {
135+
return isCompared(compare, -1, missingDefault);
136+
}
137+
138+
/**
139+
* Creates an `AltConditionFn` that checks if the SDK version is between the provided versions.
140+
* The range check is exclusive.
141+
* An optional default can be provided in case the SDK version is not available.
142+
*/
143+
function isBetween(lowEnd: SemVer, highEnd: SemVer, missingDefault?: boolean): AltConditionFn {
144+
return (ctx) => isAfter(lowEnd, missingDefault)(ctx) && isBefore(highEnd, missingDefault)(ctx);
145+
}
146+
147+
function isCompared(compare: SemVer, comparison: -1 | 0 | 1, missingDefault: boolean = false): AltConditionFn {
148+
return ({ sdkVersion }) => {
149+
if (!sdkVersion) return missingDefault;
150+
const version = parseSemver(sdkVersion);
151+
if (!version) return missingDefault;
152+
return compareSemver(compare, version) === comparison;
153+
};
154+
}
155+
106156
function parseSemver(version: string): SemVer | undefined {
107157
const matches = version.match(/(\d+)\.(\d+)\.(\d+)/);
108158
if (!matches) return undefined;
@@ -132,30 +182,3 @@ function compareSemver(a: SemVer, b: SemVer): -1 | 0 | 1 {
132182
if (a.patch > b.patch) return 1;
133183
return 0;
134184
}
135-
136-
function isCompared(compare: SemVer, comparison: -1 | 0 | 1): AltConditionFn {
137-
return ({ sdkVersion }) => {
138-
if (!sdkVersion) throw new Error('no sdk version');
139-
if (!sdkVersion) return false;
140-
const version = parseSemver(sdkVersion);
141-
if (!version) throw new Error(`no version for ${sdkVersion}`);
142-
if (!version) return false;
143-
return compareSemver(compare, version) === comparison;
144-
};
145-
}
146-
147-
function isBefore(compare: SemVer): AltConditionFn {
148-
return isCompared(compare, 1);
149-
}
150-
151-
function isEqual(compare: SemVer): AltConditionFn {
152-
return isCompared(compare, 0);
153-
}
154-
155-
function isAfter(compare: SemVer): AltConditionFn {
156-
return isCompared(compare, -1);
157-
}
158-
159-
function isBetween(lowEnd: SemVer, highEnd: SemVer): AltConditionFn {
160-
return (ctx) => isAfter(lowEnd)(ctx) && isBefore(highEnd)(ctx);
161-
}

0 commit comments

Comments
 (0)