Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Illegal Argument Exception even though the type is matching #32075

Open
2 of 17 tasks
avkarthk opened this issue Aug 4, 2024 · 0 comments
Open
2 of 17 tasks

[Bug]: Illegal Argument Exception even though the type is matching #32075

avkarthk opened this issue Aug 4, 2024 · 0 comments

Comments

@avkarthk
Copy link

avkarthk commented Aug 4, 2024

What happened?

Exception trace:
java.lang.IllegalArgumentException: Type of @element must match the DoFn typeFiltering Failed Redemption status/ParMultiDo(FilterUsersBasedOnStatus).output [PCollection@1605886859]\n\tat org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation(ParDo.java:654)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.getFieldAccess(FieldAccessVisitor.java:76)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.visitPrimitiveTransform(FieldAccessVisitor.java:48)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)\n\tat org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)\n\tat org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)\n\tat org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer.optimize(ProjectionPushdownOptimizer.java:63)\n\tat org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:83)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat

Code Snippet :

PCollection<KV<String, List<BatchUserDetails>>>  innerKVs = redeemAccessCode.apply("Extracting AC KV", Values.create());

       PCollection<List<BatchUserDetails>> listsOfBatchUserDetails = innerKVs.apply("Extracting BUD", Values.create());

       listsOfBatchUserDetails.apply("Log List of BatchUserDetails", ParDo.of(new DoFn<List<BatchUserDetails>, Void>() {
           @ProcessElement
           public void processElement(@Element List<BatchUserDetails> element, OutputReceiver<Void> out) {
               System.out.println("List<BatchUserDetails>: " + element);
           }
       }));

       PCollection<BatchUserDetails> redeemAccessCodeValues = listsOfBatchUserDetails
               .apply("Flatten Redemption BatchUserDetails", Flatten.<BatchUserDetails>iterables())
               .setTypeDescriptor(TypeDescriptor.of(BatchUserDetails.class));

       redeemAccessCodeValues.apply("Log BatchUserDetails", ParDo.of(new DoFn<BatchUserDetails, Void>() {
           @ProcessElement
           public void processElement(@Element BatchUserDetails element, OutputReceiver<Void> out) {
               logger.info("Element isntance of { }",element.getClass().getDeclaredFields());
               System.out.println("BatchUserDetails: " + element);
           }
       }));

Adding the below two lines the above exception trace is thrown
PCollection failedFilteredRedeemAccessCodeValues = redeemAccessCodeValues.apply(
"Filtering Failed Redemption status",
ParDo.of(new FilterUsersBasedOnStatusFn(TaskStatus.ACCESS_CODE_REDEMPTION_FAILED.getStatus()))
);

FilterUsersBasedOnStatusFn Implementation:

public class FilterUsersBasedOnStatusFn extends DoFn<BatchUserDetails,BatchUserDetails> {

    private static final Logger logger = LoggerFactory.getLogger(FilterUsersBasedOnStatusFn.class);
    private final String status;

    public FilterUsersBasedOnStatusFn(String status){
        this.status = status;
    }

    @ProcessElement
    public void processElement(@Element BatchUserDetails batchUserDetails,
                               OutputReceiver<BatchUserDetails> filteredUserDetails) {
        String batchId = batchUserDetails.getBatchId();
       
        if (status != null &  status.equalsIgnoreCase(batchUserDetails.getTaskDetails().getTaskResponse().getStatus())) {
            filteredUserDetails.output(batchUserDetails);
        }
    }

}

The Application is deployed in AWS as a Managed Flink Service

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant