Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Go SDK]: On prism, consuming the same PCollection as both side input and parallel input leads to duplicate elements. #27622

Closed
1 of 15 tasks
lostluck opened this issue Jul 23, 2023 · 0 comments · Fixed by #27737
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Jul 23, 2023

What happened?

While preparing the Go SDK to make prism the default runner, I discovered that a DoFn that consumes it's main input as a side input as well can lead to the SDK to execute the dofn multiple times.

imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0})
beam.ParDo(s, &int64Check{
	Name: "sum sideinput check",
	Want: []int{13, 14, 15},
}, sum)

In this case, dofn3x1 is duplicated 3 times.

 []int{
        13,
+       13, 13, 14, 14,
        14,
+       15, 15,
        15,
  }

The cause is that the exec.Plan building logic doesn't take whether an input is a "side input" or not, into account, leading to the decision that the consumer must be multiplexed.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L194

I believe that other runners rename the pcollection inputs for the side inputs, leading to avoiding this issue accidentally.

This should be validated with additional regression test pipelines, for both side inputs, and for flattens, which could be executed SDK side as well at the direction of the runner, this will validate whether this is an issue against Dataflow and other portable runners. I don't suspect this to be a common pattern however.

In any case, since prism is intended to challenge SDK assumptions, I'll have an SDK side fix for 2.50. There's sufficient information to determine whether an input is a side input or not, it's simply not presently used.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@lostluck lostluck self-assigned this Jul 23, 2023
@github-actions github-actions bot added the P2 label Jul 23, 2023
@lostluck lostluck changed the title [Bug][Go SDK]: On prism consuming the same PCollection as both side input and parallel input leads to duplicate elements. [Bug][Go SDK]: On prism, consuming the same PCollection as both side input and parallel input leads to duplicate elements. Jul 23, 2023
@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant