Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Oracle: change table limit and format record data #5542

Merged
merged 5 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72",
"name": "Oracle (Alpha)",
"dockerRepository": "airbyte/destination-oracle",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle"
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dependencies {
implementation "com.oracle.database.jdbc:ojdbc8-production:19.7.0.0"

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:oracle-xe:1.15.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-oracle')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String convertStreamName(String input) {
if (!result.isEmpty() && result.charAt(0) == '_') {
result = result.substring(1);
}
return maxStringLength(result, 30);
return maxStringLength(result, 128);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -150,8 +152,9 @@ private static void insertRawRecordsInSingleQuery(String tableName,
int i = 1;
for (final AirbyteRecordMessage message : records) {
// 1-indexed
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(message.getData());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is modifying the data itself. Is there any way around this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed with sherif. the name transformer was intended to transform "names" (e.g. table names and column names). in general we want to avoid modifying the data if at all possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing the same as in BigQuery destination (this change the json path to remove any non-supported char in the stream name not change the original data):

final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
return Jsons.jsonNode(ImmutableMap.of(
JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(),
JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(formattedData),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment in PR, sorry the confuction: Oracle 19 json_value doesnt allow ' or " in keys so it we need to clean some records the key name using a formatted function. Example: column`_'with\"_quotes becomes column___with\__quotes.

Copy link
Member Author

@marcosmarxm marcosmarxm Aug 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of AIRBYTE_DATA saved using this function. The original data is kept but to run normalization I need a "cleaned" field name.

'{"date":"2020-08-29T00:00:00Z",
"partition":{"DATA":[{"currency":"EUR"}],
"double_array_data":[[{"id":"EUR"}]],
"column___with__quotes":[{"currency":"EUR"}], --formatted
"column`_\'with\\"_quotes":[{"currency":"EUR"}]}, --original 
"id":4.2}'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. this feels like a weird sacrifice we are making for dbt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something needed for the destination itself because the source is writing a json key not supported.

statement.setString(i, uuidSupplier.get().toString());
statement.setString(i + 1, Jsons.serialize(message.getData()));
statement.setString(i + 1, Jsons.serialize(formattedData));
statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt())));
i += 3;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,72 +25,46 @@
package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;

public class OracleIntegrationTest extends DestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(OracleIntegrationTest.class);
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static OracleContainer db;
private static JsonNode baseConfig;
private ExtendedNameTransformer namingResolver = new OracleNameTransformer();
private JsonNode config;
private static JsonNode config;

@BeforeAll
protected static void init() {
db = new OracleContainer("epiclabs/docker-oracle-xe-11g");
db.start();
public JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
}

@Override
protected String getImageName() {
return "airbyte/destination-oracle:dev";
}

private JsonNode getConfig(OracleContainer db) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("schema", "testSchema")
.put("sid", db.getSid())
.build());
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", "wrong password")
.put("schema", "public")
.put("port", db.getFirstMappedPort())
.put("sid", db.getSid())
.build());
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace, JsonNode streamSchema) throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
Expand All @@ -116,6 +90,13 @@ protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, Strin
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected JsonNode getFailCheckConfig() {
final JsonNode invalidConfig = Jsons.clone(config);
((ObjectNode) invalidConfig).put("password", "wrong password");
return invalidConfig;
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
Expand All @@ -130,28 +111,27 @@ protected List<String> resolveIdentifier(String identifier) {
}

private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException {
List<org.jooq.Record> result = Databases.createOracleDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl())
.query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT))
.stream()
.collect(Collectors.toList()));
List<org.jooq.Record> result = getDatabase().query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT))
.stream()
.collect(Collectors.toList()));
return result
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList());
}

private static Database getDatabase(JsonNode config) {
private static Database getDatabase() {
// todo (cgardens) - rework this abstraction so that we do not have to pass a null into the
// constructor. at least explicitly handle it, even if the impl doesn't change.
return Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
baseConfig.get("username").asText(),
baseConfig.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
baseConfig.get("host").asText(),
baseConfig.get("port").asText(),
baseConfig.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
null);
}
Expand All @@ -172,21 +152,18 @@ private List<String> getAllTables(Database db) {

@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
config = getConfig(db);

final Database database = getDatabase(config);
database.query(ctx -> {
ctx.execute("alter database default tablespace users");
return null;
});
Comment on lines -178 to -181
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because requires a very strong privilege to execute it. I'm running the AWS instance where I can change the permission of my admin user.

// config = getConfig(db);
baseConfig = getStaticConfig();
config = Jsons.clone(baseConfig);
final Database database = getDatabase();
allTables = getAllTables(database);
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
config = getConfig(db);
config = getStaticConfig();

final Database database = getDatabase(config);
final Database database = getDatabase();
var tables = getAllTables(database);
tables.removeAll(allTables);
try {
Expand All @@ -201,10 +178,4 @@ protected void tearDown(TestDestinationEnv testEnv) {
}
}

@AfterAll
static void cleanUp() {
db.stop();
db.close();
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You should now have all the requirements needed to configure Oracle as a destina
## Changelog
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2021-08-23 | [#5542](https://github.com/airbytehq/airbyte/pull/5542) | Remove support for Oracle 11g to allow normalization |
| 0.1.5 | 2021-08-10 | [#5307](https://github.com/airbytehq/airbyte/pull/5307) | 🐛 Destination Oracle: Fix destination check for users without dba role |
| 0.1.4 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.3 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
Expand Down