We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exception trace: java.lang.IllegalArgumentException: Type of @element must match the DoFn typeFiltering Failed Redemption status/ParMultiDo(FilterUsersBasedOnStatus).output [PCollection@1605886859]\n\tat org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation(ParDo.java:654)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.getFieldAccess(FieldAccessVisitor.java:76)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.visitPrimitiveTransform(FieldAccessVisitor.java:48)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)\n\tat org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)\n\tat org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)\n\tat org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer.optimize(ProjectionPushdownOptimizer.java:63)\n\tat org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:83)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat
Code Snippet :
PCollection<KV<String, List<BatchUserDetails>>> innerKVs = redeemAccessCode.apply("Extracting AC KV", Values.create()); PCollection<List<BatchUserDetails>> listsOfBatchUserDetails = innerKVs.apply("Extracting BUD", Values.create()); listsOfBatchUserDetails.apply("Log List of BatchUserDetails", ParDo.of(new DoFn<List<BatchUserDetails>, Void>() { @ProcessElement public void processElement(@Element List<BatchUserDetails> element, OutputReceiver<Void> out) { System.out.println("List<BatchUserDetails>: " + element); } })); PCollection<BatchUserDetails> redeemAccessCodeValues = listsOfBatchUserDetails .apply("Flatten Redemption BatchUserDetails", Flatten.<BatchUserDetails>iterables()) .setTypeDescriptor(TypeDescriptor.of(BatchUserDetails.class)); redeemAccessCodeValues.apply("Log BatchUserDetails", ParDo.of(new DoFn<BatchUserDetails, Void>() { @ProcessElement public void processElement(@Element BatchUserDetails element, OutputReceiver<Void> out) { logger.info("Element isntance of { }",element.getClass().getDeclaredFields()); System.out.println("BatchUserDetails: " + element); } }));
Adding the below two lines the above exception trace is thrown PCollection failedFilteredRedeemAccessCodeValues = redeemAccessCodeValues.apply( "Filtering Failed Redemption status", ParDo.of(new FilterUsersBasedOnStatusFn(TaskStatus.ACCESS_CODE_REDEMPTION_FAILED.getStatus())) );
FilterUsersBasedOnStatusFn Implementation:
public class FilterUsersBasedOnStatusFn extends DoFn<BatchUserDetails,BatchUserDetails> { private static final Logger logger = LoggerFactory.getLogger(FilterUsersBasedOnStatusFn.class); private final String status; public FilterUsersBasedOnStatusFn(String status){ this.status = status; } @ProcessElement public void processElement(@Element BatchUserDetails batchUserDetails, OutputReceiver<BatchUserDetails> filteredUserDetails) { String batchId = batchUserDetails.getBatchId(); if (status != null & status.equalsIgnoreCase(batchUserDetails.getTaskDetails().getTaskResponse().getStatus())) { filteredUserDetails.output(batchUserDetails); } } }
The Application is deployed in AWS as a Managed Flink Service
Priority: 2 (default / most bugs should be filed as P2)
The text was updated successfully, but these errors were encountered:
No branches or pull requests
What happened?
Exception trace:
java.lang.IllegalArgumentException: Type of @element must match the DoFn typeFiltering Failed Redemption status/ParMultiDo(FilterUsersBasedOnStatus).output [PCollection@1605886859]\n\tat org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation(ParDo.java:654)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.getFieldAccess(FieldAccessVisitor.java:76)\n\tat org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.visitPrimitiveTransform(FieldAccessVisitor.java:48)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)\n\tat org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)\n\tat org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)\n\tat org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer.optimize(ProjectionPushdownOptimizer.java:63)\n\tat org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:83)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat
Code Snippet :
Adding the below two lines the above exception trace is thrown
PCollection failedFilteredRedeemAccessCodeValues = redeemAccessCodeValues.apply(
"Filtering Failed Redemption status",
ParDo.of(new FilterUsersBasedOnStatusFn(TaskStatus.ACCESS_CODE_REDEMPTION_FAILED.getStatus()))
);
FilterUsersBasedOnStatusFn Implementation:
The Application is deployed in AWS as a Managed Flink Service
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: