Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve pane index through reshuffle. #34348

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

claudevdm
Copy link
Collaborator

@claudevdm claudevdm commented Mar 19, 2025

Should fix #32636.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@claudevdm claudevdm requested a review from kennknowles March 24, 2025 17:30
@claudevdm claudevdm marked this pull request as ready for review March 25, 2025 14:53
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @shunping for label python.
R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@@ -124,6 +124,7 @@ var (
CoderTimer = cdrUrn(pipepb.StandardCoders_TIMER)

CoderKV = cdrUrn(pipepb.StandardCoders_KV)
CoderTuple = "beam:coder:tuple:v1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems suspiciously out of place, being a magic string and also not a standard coder. What is the story behind it?

Copy link
Collaborator Author

@claudevdm claudevdm Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I placed it here because python switches between tuple coder depending on number of elements which cauuses python tests to fail with prism runner

if self.is_kv_coder():

For example, with the pane index change there ends up being a tuple coder with 3 elements (because pane info is now included)

coders {
    key: "ref_Coder_TupleCoder_6"
    value {
      spec {
        urn: "beam:coder:tuple:v1"
      }
      component_coder_ids: "ref_Coder_BytesCoder_1"
      component_coder_ids: "ref_Coder_NullableCoder_7"
      component_coder_ids: "ref_Coder_FastPrimitivesCoder_8"
    }
  }

But without pane info it falls back to using KV coder which is supported by prism

coders {
    key: "ref_Coder_TupleCoder_6"
    value {
      spec {
        urn: "beam:coder:kv:v1"
      }
      component_coder_ids: "ref_Coder_BytesCoder_1"
      component_coder_ids: "ref_Coder_NullableCoder_7"
    }
  }

Prism error without this change:
ERROR:root:prism error building stage stage-002:
unknown coder urn key: beam:coder:tuple:v1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something we probably need to fix another way. The actual shuffle needs KVs, with everything we want to preserve reified into the value component.

Copy link
Collaborator Author

@claudevdm claudevdm Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual shuffle needs KVs

What do you mean by this? The way I understand it is

I guess if we want to avoid this we can have a nested kv in ReifyMetedata so it is

  • key, (value, (timestamp, pane_info))
    Then the regular kv coder should work?
    Or can we also use windowed_value as the value in the reify output instead of a tuple with the medatada?

The original reify just used a kv as the value in the reify function

return key, (value, timestamp)

We are now including pane info as mentioned above so it becomes a tuple

This only happens for global window case, in the custom window case the value for reify is a windowed value

return key, windowed_value.WindowedValue(value, timestamp, [window])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Yea KV(key, (value, timestamp, pane_info) should be fine. The runner should not need to have any understanding of the coder for (value, timestamp, pane_info) since that is in general user type / coder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK... @lostluck does know Prism best. But it isn't in line with my take on the model. Tuple is a language-specific esoteric coder that isn't part of the Beam model and shouldn't be explicitly understood by anything outside the Python SDK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and of course big follow-up question: this coder is not new... presumably it already works, so why does this change require it to become runner-understood?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's as Claude says.

Basically the Python Tuple coder is an outlier: It pretends to be a standard beam coder, with an arbitrary number of components, and Python plays fast and loose with the notion of coder types. No other "custom coder" uses exposed sub components, essentially. Custom coders are usually fully opaque.

The problem here is I tried to avoid needing to enumerate all Beam coders with sub components that needed processing. We already had a "leaf" list, why do we also need a "composite" list? That means there are two approaches:

  1. This current PR's approach: Promote the janky python approach to be a known thing for all runners/SDKs. Since the URN is already pretending to be a standard URN, this isn't too hard, and it permits other SDKs to interoperate with that coder. AKA, turning the exception to be part of the standard.

  2. We are forced to specify known composite coders to avoid Length Prefixing them unnecessarily.
    So instead of just the set of Known Leaf Coders, we would have the set of Known Composite Coders, that don't need length prefixing. Anything else should just be length prefixed.

Eg. We add a list of the known Composite URNs that should not be length prefixed by the

Existing Leaf list:

var leafCoders = map[string]struct{}{

Where the check should go, so it's the same logic for wrapping unknowns. eg.

if len(c.GetComponentCoderIds()) == 0 && !leaf {

if (len(c.GetComponentCoderIds()) == 0 && !leaf) || !isKnownCompositeCoder(c) {

This has two risks though:

  1. Changed length prefixing behavior, may mean tests that are currently passing will fail. It'll be important to run the Java suite locally (I don't trust the Github action to run uncached when it's just a prism side change. Only noticed that after I got re-orged. If it takes less than 20m to run, it was cached and can't be trusted).

  2. The converse issue: What if the Python SDK doesn't know how to deal with a runner side wrapped length prefixed tuple coder? Then a Python fix would be needed. This would hopefully be evident in the test suite uses of tuple coder.

(There's a issue with Java Row coders failing deep in the Java SDK when they're wrapped in a length prefix. The introspection doesn't know how to skip the LP wrapper (see #32931).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added link to the tracking issue for the prism tuple coder issue to the PR description: #32636

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and of course big follow-up question: this coder is not new... presumably it already works, so why does this change require it to become runner-understood?

And specifically for this question: If it's not in the validates runner suite it's extremely hard to test when one doesn't use the SDK. In this case, there was exactly one test for it. There's only ~70 validates runner tests for Python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[prism] Python Validates Runner (test_pack_combiners) - Unknown Coder not being processed (tuple)
3 participants