Skip to content

Commit

Permalink
DatastreamToSQL: Add failing SQL statement to error message logged
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 437576372
  • Loading branch information
dhercher authored and cloud-teleport committed Mar 27, 2022
1 parent eb7d31c commit 9a72151
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,10 @@ private void executeBatchSingleStatementFormatting()
statement.executeUpdate(formattedStatement);
connection.commit();
} catch (SQLException exception) {
LOG.error("SQLException Occurred: {}", exception.toString());
LOG.error(
"SQLException Occurred: {} while executing statement: {}",
exception.toString(),
formattedStatement);
connection.rollback();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ public void processElement(ProcessContext context) {

// Null rows suggest no DML is required.
if (dmlInfo != null) {
LOG.debug("Output Data: {}", jsonString);
context.output(KV.of(dmlInfo.getStateWindowKey(), dmlInfo));
} else {
LOG.debug("Skipping Null DmlInfo: {}", jsonString);
}
} catch (IOException e) {
// TODO(dhercher): Push failure to DLQ collection
Expand Down Expand Up @@ -209,6 +212,7 @@ public DmlInfo convertJsonToDmlInfo(JsonNode rowObj, String failsafeValue) {
Map<String, String> tableSchema = this.getTableSchema(catalogName, schemaName, tableName);
if (tableSchema.isEmpty()) {
// If the table DNE we return null (NOOP)
LOG.debug("Table Not Found: {}.{}.{}", catalogName, schemaName, tableName);
return null;
}

Expand Down

0 comments on commit 9a72151

Please sign in to comment.