Skip to content

PubSub to BigQuery Javascript UDF destroys attributes #69

@mollyporph

Description

@mollyporph

Hi,
We're using the PubSub Subscription to bigquery template. We have data in both PubSubMessage Attributes and the body. Our body contains an array without a field name i.e

[
 {"id": "item1"},
 {"id": "item2"}
]

Which the template had issues parsing, so we added a simple UDF

function process(str){
    var arrayOfItems = JSON.parse(str);
    var outObject = {items: arrayOfItems};
    return JSON.stringify(outObject);

When this template runs it seems like the attributes are discarded after the UDF step.

I'm not that well versed with BEAM but it seems that when the InvokeUDF step is built it's discarding everything but the message payload

 PCollectionTuple udfOut =
          input
              // Map the incoming messages into FailsafeElements so we can recover from failures
              // across multiple transforms.
              .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
              .apply(
                  "InvokeUDF",
                  FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setSuccessTag(UDF_OUT)
                      .setFailureTag(UDF_DEADLETTER_OUT)
                      .build());

The PubsubMessageToFailsafeElementFn looks like this

 static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    }
  }

It seems to call message.getPlayload which would probably cause the issue.

So my question is: Am I doing something wrong, is there some way of getting both the attributes and the payload through the UDF? Or do I have to modify the java template?

Thanks in advance!

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions