Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sanitized column name in some destinations' raw table outputs #5026

Merged
merged 2 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions airbyte-commons/src/main/java/io/airbyte/commons/text/Names.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class Names {
public static final String NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN = "[^\\p{Alnum}_]";

/**
* Converts any UTF8 string to a string with only alphanumeric and _ characters.
* Converts any UTF8 string to a string with only alphanumeric and _ characters without preserving
* accent characters.
*
* @param s string to convert
* @return cleaned string
Expand All @@ -44,28 +45,6 @@ public static String toAlphanumericAndUnderscore(String s) {
.replaceAll(NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN, "_");
}

/**
* Concatenate Strings together, but handles the case where the strings are already quoted
*/
public static String concatQuotedNames(final String inputStr1, final String inputStr2) {
boolean anyQuotes = false;
String unquotedStr1 = inputStr1;
String unquotedStr2 = inputStr2;
if (inputStr1.startsWith("\"")) {
unquotedStr1 = inputStr1.substring(1, inputStr1.length() - 1);
anyQuotes = true;
}
if (inputStr2.startsWith("\"")) {
unquotedStr2 = inputStr2.substring(1, inputStr2.length() - 1);
anyQuotes = true;
}
if (anyQuotes) {
return "\"" + unquotedStr1 + unquotedStr2 + "\"";
} else {
return unquotedStr1 + unquotedStr2;
}
}

public static String doubleQuote(String value) {
return internalQuote(value, '"');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.8",
"dockerImageTag": "0.3.9",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.8
dockerImageTag: 0.3.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
Expand Down Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@

package io.airbyte.integrations.destination;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.util.MoreIterators;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class StandardNameTransformer implements NamingConventionTransformer {

private static final String NON_JSON_PATH_CHARACTERS_PATTERN = "['\"`]";

@Override
public String getIdentifier(String name) {
return convertStreamName(name);
Expand All @@ -48,4 +56,37 @@ protected String convertStreamName(String input) {
return Names.toAlphanumericAndUnderscore(input);
}

/**
* Rebuild a JsonNode adding sanitized property names (a subset of special characters replaced by
* underscores) while keeping original property names too. This is needed by some destinations as
* their json extract functions have limitations on how such special characters are parsed. These
* naming rules may be different to schema/table/column naming conventions.
*/
public static JsonNode formatJsonPath(JsonNode root) {
if (root.isObject()) {
final Map<String, JsonNode> properties = new HashMap<>();
var keys = Jsons.keys(root);
for (var key : keys) {
final JsonNode property = root.get(key);
// keep original key
properties.put(key, formatJsonPath(property));
}
for (var key : keys) {
final JsonNode property = root.get(key);
final String formattedKey = key.replaceAll(NON_JSON_PATH_CHARACTERS_PATTERN, "_");
if (!properties.containsKey(formattedKey)) {
// duplicate property in a formatted key to be extracted in normalization
properties.put(formattedKey, formatJsonPath(property));
}
}
return Jsons.jsonNode(properties);
} else if (root.isArray()) {
return Jsons.jsonNode(MoreIterators.toList(root.elements()).stream()
.map(StandardNameTransformer::formatJsonPath)
.collect(Collectors.toList()));
} else {
return root;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.version=0.3.9
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected JsonNode formatRecord(Schema schema, AirbyteRecordMessage recordMessag
// use BQ helpers to string-format correctly.
long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();

final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
return Jsons.jsonNode(ImmutableMap.of(
JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(),
JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(recordMessage.getData()),
JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(formattedData),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.destination.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand Down Expand Up @@ -115,7 +116,7 @@ protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records

for (AirbyteRecordMessage record : records) {
var uuid = UUID.randomUUID().toString();
var jsonData = Jsons.serialize(record.getData());
var jsonData = Jsons.serialize(formatData(record.getData()));
var emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
csvPrinter.printRecord(uuid, jsonData, emittedAt);
}
Expand All @@ -126,6 +127,10 @@ protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records
}
}

protected JsonNode formatData(JsonNode data) {
return data;
}

@Override
public String truncateTableQuery(JdbcDatabase database, String schemaName, String tableName) {
return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package io.airbyte.integrations.destination.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
Expand Down Expand Up @@ -92,6 +94,11 @@ private void loadDataIntoTable(JdbcDatabase database,
});
}

@Override
protected JsonNode formatData(JsonNode data) {
return StandardNameTransformer.formatJsonPath(data);
}

void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException {
boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database);
if (!localFileEnabled) {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.3.9 | 2021-07-28 | [#5026](https://github.com/airbytehq/airbyte/pull/5026) | Add sanitized json fields in raw tables to handle quotes in column names |
| 0.3.6 | 2021-06-18 | [#3947](https://github.com/airbytehq/airbyte/issues/3947) | Service account credentials are now optional. |
| 0.3.4 | 2021-06-07 | [#3277](https://github.com/airbytehq/airbyte/issues/3277) | Add dataset location option |

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ As a result, Airbyte MySQL destination forces all identifier (table, schema and

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2021-07-28 | [#5026](https://github.com/airbytehq/airbyte/pull/5026) | Add sanitized json fields in raw tables to handle quotes in column names |
| 0.1.7 | 2021-07-09 | [#4651](https://github.com/airbytehq/airbyte/pull/4651) | Switch normalization flag on so users can use normalization. |
| 0.1.6 | 2021-07-03 | [#4531](https://github.com/airbytehq/airbyte/pull/4531) | Added normalization for MySQL. |
| 0.1.5 | 2021-07-03 | [#3973](https://github.com/airbytehq/airbyte/pull/3973) | Added `AIRBYTE_ENTRYPOINT` for kubernetes support. |
Expand Down