Skip to content

Fix memo payload metadata memory sharing bug #901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,11 @@ async def decode_activation(
for val in job.initialize_workflow.memo.fields.values():
# This uses API payload not bridge payload
new_payload = (await codec.decode([val]))[0]
# Make a shallow copy, in case new_payload.metadata and val.metadata are
# references to the same memory, e.g. decode() returns its input unchanged.
new_metadata = dict(new_payload.metadata)
val.metadata.clear()
val.metadata.update(new_payload.metadata)
val.metadata.update(new_metadata)
val.data = new_payload.data
elif job.HasField("do_update"):
await _decode_payloads(job.do_update.input, codec)
Expand Down
25 changes: 25 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,31 @@ async def test_workflow_with_passthrough_codec(client: Client):
await test_workflow_simple_activity(client)


@workflow.defn
class MemoDecodingWorkflow:
@workflow.run
async def run(self, memo_key: str) -> Any:
return workflow.memo_value(memo_key)


async def test_workflow_memo_decoding_with_passthrough_codec(client: Client):
# This used to fail because activation decoding accidentally cleared the memo
# payload metadata (containing the encoding) due to memory sharing between the
# before-decoding and after-decoding value
config = client.config()
config["data_converter"] = DataConverter(payload_codec=PassThroughCodec())
client = Client(**config)
async with new_worker(client, MemoDecodingWorkflow) as worker:
memo_value = await client.execute_workflow(
MemoDecodingWorkflow.run,
"memokey",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
memo={"memokey": {"memoval_key": "memoval_value"}},
)
assert memo_value == {"memoval_key": "memoval_value"}


class CustomWorkflowRunner(WorkflowRunner):
def __init__(self) -> None:
super().__init__()
Expand Down