Skip to content

fix(workflow): Workflow Activation Encoder was discarding SDK flags #1530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 97 additions & 74 deletions packages/common/src/proto-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,85 +5,99 @@ import { patchProtobufRoot } from '@temporalio/proto/lib/patch-protobuf-root';
export type History = proto.temporal.api.history.v1.IHistory;
export type Payload = proto.temporal.api.common.v1.IPayload;

/**
* JSON representation of Temporal's {@link Payload} protobuf object
*/
export interface JSONPayload {
/**
* Mapping of key to base64 encoded value
*/
metadata?: Record<string, string> | null;
/**
* base64 encoded value
*/
data?: string | null;
}

// Cast to any because the generated proto module types are missing the lookupType method
const patched = patchProtobufRoot(proto) as any;
const historyType = patched.lookupType('temporal.api.history.v1.History');
const payloadType = patched.lookupType('temporal.api.common.v1.Payload');

function pascalCaseToConstantCase(s: string) {
return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase();
}

function fixEnumValue<O extends Record<string, any>>(obj: O, attr: keyof O, prefix: string) {
return (
obj[attr] && {
[attr]: obj[attr].startsWith(prefix) ? obj[attr] : `${prefix}_${pascalCaseToConstantCase(obj[attr])}`,
}
);
}
/**
* Convert a proto JSON representation of History to a valid History object
*/
export function historyFromJSON(history: unknown): History {
function pascalCaseToConstantCase(s: string) {
return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase();
}

// fromProto3JSON doesn't allow null values on 'bytes' fields. This turns out to be a problem for payloads.
// Recursively descend on objects and array, and fix in-place any payload that has a null data field
function fixPayloads<T>(e: T): T {
function isPayload(p: any): p is JSONPayload {
return p && typeof p === 'object' && 'metadata' in p && 'data' in p;
function fixEnumValue<O extends Record<string, any>>(obj: O, attr: keyof O, prefix: string) {
return (
obj[attr] && {
[attr]: obj[attr].startsWith(prefix) ? obj[attr] : `${prefix}_${pascalCaseToConstantCase(obj[attr])}`,
}
);
}

if (e && typeof e === 'object') {
if (isPayload(e)) {
if (e.data === null) {
const { data: _data, ...rest } = e;
return rest as T;
// fromProto3JSON doesn't allow null values on 'bytes' fields. This turns out to be a problem for payloads.
// Recursively descend on objects and array, and fix in-place any payload that has a null data field
function fixPayloads<T>(e: T): T {
function isPayload(p: any): p is JSONPayload {
return p && typeof p === 'object' && 'metadata' in p && 'data' in p;
}

if (e && typeof e === 'object') {
if (isPayload(e)) {
if (e.data === null) {
const { data: _data, ...rest } = e;
return rest as T;
}
return e;
}
return e;
if (Array.isArray(e)) return e.map(fixPayloads) as T;
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixPayloads(v)])) as T;
}
if (Array.isArray(e)) return e.map(fixPayloads) as T;
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixPayloads(v)])) as T;
return e;
}
return e;
}

function fixHistoryEvent(e: Record<string, any>) {
const type = Object.keys(e).find((k) => k.endsWith('EventAttributes'));
if (!type) {
throw new TypeError(`Missing attributes in history event: ${JSON.stringify(e)}`);
}
function fixHistoryEvent(e: Record<string, any>) {
const type = Object.keys(e).find((k) => k.endsWith('EventAttributes'));
if (!type) {
throw new TypeError(`Missing attributes in history event: ${JSON.stringify(e)}`);
}

// Fix payloads with null data
e = fixPayloads(e);

return {
...e,
...fixEnumValue(e, 'eventType', 'EVENT_TYPE'),
[type]: {
...e[type],
...(e[type].taskQueue && {
taskQueue: { ...e[type].taskQueue, ...fixEnumValue(e[type].taskQueue, 'kind', 'TASK_QUEUE_KIND') },
}),
...fixEnumValue(e[type], 'parentClosePolicy', 'PARENT_CLOSE_POLICY'),
...fixEnumValue(e[type], 'workflowIdReusePolicy', 'WORKFLOW_ID_REUSE_POLICY'),
...fixEnumValue(e[type], 'initiator', 'CONTINUE_AS_NEW_INITIATOR'),
...fixEnumValue(e[type], 'retryState', 'RETRY_STATE'),
...(e[type].childWorkflowExecutionFailureInfo && {
childWorkflowExecutionFailureInfo: {
...e[type].childWorkflowExecutionFailureInfo,
...fixEnumValue(e[type].childWorkflowExecutionFailureInfo, 'retryState', 'RETRY_STATE'),
},
}),
},
};
}
// Fix payloads with null data
e = fixPayloads(e);

function fixHistory(h: Record<string, any>) {
return {
events: h.events.map(fixHistoryEvent),
};
}
return {
...e,
...fixEnumValue(e, 'eventType', 'EVENT_TYPE'),
[type]: {
...e[type],
...(e[type].taskQueue && {
taskQueue: { ...e[type].taskQueue, ...fixEnumValue(e[type].taskQueue, 'kind', 'TASK_QUEUE_KIND') },
}),
...fixEnumValue(e[type], 'parentClosePolicy', 'PARENT_CLOSE_POLICY'),
...fixEnumValue(e[type], 'workflowIdReusePolicy', 'WORKFLOW_ID_REUSE_POLICY'),
...fixEnumValue(e[type], 'initiator', 'CONTINUE_AS_NEW_INITIATOR'),
...fixEnumValue(e[type], 'retryState', 'RETRY_STATE'),
...(e[type].childWorkflowExecutionFailureInfo && {
childWorkflowExecutionFailureInfo: {
...e[type].childWorkflowExecutionFailureInfo,
...fixEnumValue(e[type].childWorkflowExecutionFailureInfo, 'retryState', 'RETRY_STATE'),
},
}),
},
};
}

function fixHistory(h: Record<string, any>) {
return {
events: h.events.map(fixHistoryEvent),
};
}

/**
* Convert a proto JSON representation of History to a valid History object
*/
export function historyFromJSON(history: unknown): History {
if (typeof history !== 'object' || history == null || !Array.isArray((history as any).events)) {
throw new TypeError('Invalid history, expected an object with an array of events');
}
Expand All @@ -95,17 +109,26 @@ export function historyFromJSON(history: unknown): History {
}

/**
* JSON representation of Temporal's {@link Payload} protobuf object
* Convert an History object, e.g. as returned by `WorkflowClient.list().withHistory()`, to a JSON
* string that adheres to the same norm as JSON history files produced by other Temporal tools.
*/
export interface JSONPayload {
/**
* Mapping of key to base64 encoded value
*/
metadata?: Record<string, string> | null;
/**
* base64 encoded value
*/
data?: string | null;
export function historyToJSON(history: History): string {
// toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the
// output, after which JSON.stringify() would convert them to an array of numbers. As a workaround,
// recursively walk the object and convert all Buffer instances to base64 strings. Note this only
// works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance
// to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103.
function fixBuffers<T>(e: T): T {
if (e && typeof e === 'object') {
if (e instanceof Buffer) return e.toString('base64') as any;
if (Array.isArray(e)) return e.map(fixBuffers) as T;
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T;
}
return e;
}

const protoJson = toProto3JSON(proto.temporal.api.history.v1.History.fromObject(history) as any);
return JSON.stringify(fixBuffers(protoJson), null, 2);
}

/**
Expand Down
194 changes: 194 additions & 0 deletions packages/test/history_files/lang_flags_replay_correctly_1_11_1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-09-23T18:06:34.810426Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048604",
"workflowExecutionStartedEventAttributes": {
"workflowType": { "name": "langFlagsReplayCorrectly" },
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
"input": {},
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "6cb24f2a-74ed-4a8e-9db0-8a0e3dfebaef",
"identity": "12645@JamesMBTemporal",
"firstExecutionRunId": "6cb24f2a-74ed-4a8e-9db0-8a0e3dfebaef",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": { "fields": {} },
"workflowId": "4d57b445-d3e7-4e20-bef9-36193a29c380"
}
},
{
"eventId": "2",
"eventTime": "2024-09-23T18:06:34.810489Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048605",
"workflowTaskScheduledEventAttributes": {
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-09-23T18:06:34.816506Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048610",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "12645@JamesMBTemporal",
"requestId": "7db844e1-00d2-4536-b37c-4eaa8abc9d58",
"historySizeBytes": "320",
"workerVersion": {
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
}
}
},
{
"eventId": "4",
"eventTime": "2024-09-23T18:06:34.869609Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048615",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "12645@JamesMBTemporal",
"workerVersion": {
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
},
"sdkMetadata": { "coreUsedFlags": [3, 1, 2] },
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-09-23T18:06:34.869660Z",
"eventType": "EVENT_TYPE_TIMER_STARTED",
"taskId": "1048616",
"timerStartedEventAttributes": {
"timerId": "1",
"startToFireTimeout": "10s",
"workflowTaskCompletedEventId": "4"
}
},
{
"eventId": "6",
"eventTime": "2024-09-23T18:06:34.869701Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
"taskId": "1048617",
"activityTaskScheduledEventAttributes": {
"activityId": "1",
"activityType": { "name": "noopActivity" },
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
"header": { "fields": {} },
"scheduleToCloseTimeout": "10s",
"scheduleToStartTimeout": "10s",
"startToCloseTimeout": "10s",
"heartbeatTimeout": "0s",
"workflowTaskCompletedEventId": "4",
"retryPolicy": { "initialInterval": "1s", "backoffCoefficient": 2, "maximumInterval": "100s" },
"useWorkflowBuildId": true
}
},
{
"eventId": "7",
"eventTime": "2024-09-23T18:06:34.869850Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED",
"taskId": "1048622",
"activityTaskStartedEventAttributes": {
"scheduledEventId": "6",
"identity": "12645@JamesMBTemporal",
"requestId": "db9e723d-202c-4d75-ae5f-06a8e5489428",
"attempt": 1,
"workerVersion": {
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
}
}
},
{
"eventId": "8",
"eventTime": "2024-09-23T18:06:34.878511Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
"taskId": "1048623",
"activityTaskCompletedEventAttributes": {
"result": {
"payloads": [{ "metadata": { "encoding": "YmluYXJ5L251bGw=" }, "data": "" }]
},
"scheduledEventId": "6",
"startedEventId": "7",
"identity": "12645@JamesMBTemporal"
}
},
{
"eventId": "9",
"eventTime": "2024-09-23T18:06:34.878515Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048624",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "12645@JamesMBTemporal-d05d6c11960c40e792c37014aed3ec23",
"kind": "TASK_QUEUE_KIND_STICKY",
"normalName": "Lang's_SDK_flags_replay_correctly"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "10",
"eventTime": "2024-09-23T18:06:34.879463Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048628",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "9",
"identity": "12645@JamesMBTemporal",
"requestId": "805b6064-313b-4a4d-a4fd-d29ec4a1afea",
"historySizeBytes": "1223",
"workerVersion": {
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
}
}
},
{
"eventId": "11",
"eventTime": "2024-09-23T18:06:34.889077Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048632",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "9",
"startedEventId": "10",
"identity": "12645@JamesMBTemporal",
"workerVersion": {
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
},
"sdkMetadata": {},
"meteringMetadata": {}
}
},
{
"eventId": "12",
"eventTime": "2024-09-23T18:06:34.889101Z",
"eventType": "EVENT_TYPE_TIMER_CANCELED",
"taskId": "1048633",
"timerCanceledEventAttributes": {
"timerId": "1",
"startedEventId": "5",
"workflowTaskCompletedEventId": "11",
"identity": "12645@JamesMBTemporal"
}
},
{
"eventId": "13",
"eventTime": "2024-09-23T18:06:34.889124Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1048634",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [{ "metadata": { "encoding": "YmluYXJ5L251bGwK" }, "data": "" }]
},
"workflowTaskCompletedEventId": "11"
}
}
]
}
Loading
Loading