-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Hi,
We're using the PubSub Subscription to bigquery template. We have data in both PubSubMessage Attributes and the body. Our body contains an array without a field name i.e
[
{"id": "item1"},
{"id": "item2"}
]Which the template had issues parsing, so we added a simple UDF
function process(str){
var arrayOfItems = JSON.parse(str);
var outObject = {items: arrayOfItems};
return JSON.stringify(outObject);When this template runs it seems like the attributes are discarded after the UDF step.
I'm not that well versed with BEAM but it seems that when the InvokeUDF step is built it's discarding everything but the message payload
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());The PubsubMessageToFailsafeElementFn looks like this
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}It seems to call message.getPlayload which would probably cause the issue.
So my question is: Am I doing something wrong, is there some way of getting both the attributes and the payload through the UDF? Or do I have to modify the java template?
Thanks in advance!