-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Hi there,
Got the below issue (missing double-quote at beginning of filedname) when creating a kafka-pubsub-bigquery pipeline using the pubsub to bigquery template.
Failed to serialize json to table row: {SPEED=71.2, FREEWAY_ID=163, TIMESTAMP=2008-11-01 00:00:00, LONGITUDE=-117.155519, FREEWAY_DIR=S, LATITUDE=32.749679, LANE=3}
.....
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('S' (code 83)): was expecting double-quote to start field name
at [Source: {SPEED=71.2, FREEWAY_ID=163, TIMESTAMP=2008-11-01 00:00:00, LONGITUDE=-117.155519, FREEWAY_DIR=S, LATITUDE=32.749679, LANE=3}; line: 1, column: 3]
I think the error popped up from the below codes which cannot deal with missing double-quote or some non-standard json-like format (like "key=value" format), which got created from upstream pipeline... I am wondering if you could add something that may look similar to this https://www.mkyong.com/java/jackson-was-expecting-double-quote-to-start-field-name/ for our case
to the source code like an input format validation, transformation and then exception handling. In case of having an upstream that could take standard json but feed non-standard jsons into pubsub and then this dataflow template, it would be better to have this added.
/**
* Converts a JSON string to a {@link TableRow} object. If the data fails to convert, a {@link
* RuntimeException} will be thrown.
*
* @param json The JSON string to parse.
* @return The parsed {@link TableRow} object.
*/
private static TableRow convertJsonToTableRow(String json) {
TableRow row;
// Parse the JSON into a {@link TableRow} object.
try (InputStream inputStream =
new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize json to table row: " + json, e);
}
return row;
}
Thank you!