Skip to content

Fix issue with codecs returning passed-in payloads #526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ async def _apply_to_payloads(
if len(payloads) == 0:
return
new_payloads = await cb(payloads)
if new_payloads is payloads:
return
del payloads[:]
# TODO(cretz): Copy too expensive?
payloads.extend(new_payloads)
Expand All @@ -189,9 +191,7 @@ async def _apply_to_payload(
) -> None:
"""Apply API payload callback to payload."""
new_payload = (await cb([payload]))[0]
payload.metadata.clear()
payload.metadata.update(new_payload.metadata)
payload.data = new_payload.data
payload.CopyFrom(new_payload)


async def _decode_payloads(
Expand Down
18 changes: 18 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,24 @@ async def test_workflow_with_codec(client: Client, env: WorkflowEnvironment):
await test_workflow_update_handlers_happy(client, env)


class PassThroughCodec(PayloadCodec):
async def encode(self, payloads: Sequence[Payload]) -> List[Payload]:
return list(payloads)

async def decode(self, payloads: Sequence[Payload]) -> List[Payload]:
return list(payloads)


async def test_workflow_with_passthrough_codec(client: Client):
# Make client with this codec and run the activity test. This used to fail
# because there was a bug where the codec couldn't reuse the passed-in
# payloads.
config = client.config()
config["data_converter"] = DataConverter(payload_codec=PassThroughCodec())
client = Client(**config)
await test_workflow_simple_activity(client)


class CustomWorkflowRunner(WorkflowRunner):
def __init__(self) -> None:
super().__init__()
Expand Down