Skip to content

Add format transformation for json-like input for BigQueryConverters #25

@JessieJingxuGao

Description

@JessieJingxuGao

Hi there,

Got the below issue (missing double-quote at beginning of filedname) when creating a kafka-pubsub-bigquery pipeline using the pubsub to bigquery template.

Failed to serialize json to table row: {SPEED=71.2, FREEWAY_ID=163, TIMESTAMP=2008-11-01 00:00:00, LONGITUDE=-117.155519, FREEWAY_DIR=S, LATITUDE=32.749679, LANE=3}
.....
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('S' (code 83)): was expecting double-quote to start field name
 at [Source: {SPEED=71.2, FREEWAY_ID=163, TIMESTAMP=2008-11-01 00:00:00, LONGITUDE=-117.155519, FREEWAY_DIR=S, LATITUDE=32.749679, LANE=3}; line: 1, column: 3]

I think the error popped up from the below codes which cannot deal with missing double-quote or some non-standard json-like format (like "key=value" format), which got created from upstream pipeline... I am wondering if you could add something that may look similar to this https://www.mkyong.com/java/jackson-was-expecting-double-quote-to-start-field-name/ for our case
to the source code like an input format validation, transformation and then exception handling. In case of having an upstream that could take standard json but feed non-standard jsons into pubsub and then this dataflow template, it would be better to have this added.

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/common/BigQueryConverters.java

/**
  * Converts a JSON string to a {@link TableRow} object. If the data fails to convert, a {@link
  * RuntimeException} will be thrown.
  *
  * @param json The JSON string to parse.
  * @return The parsed {@link TableRow} object.
  */
 private static TableRow convertJsonToTableRow(String json) {
   TableRow row;
   // Parse the JSON into a {@link TableRow} object.
   try (InputStream inputStream =
       new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
     row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);

   } catch (IOException e) {
     throw new RuntimeException("Failed to serialize json to table row: " + json, e);
   }

   return row;
 }

Thank you!

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions