Skip to content

Commit

Permalink
Create jobs database tables without init container (#4942)
Browse files Browse the repository at this point in the history
* Refactor jobs and configs database initialization

* Add unit tests

* Format code

* Refactor code

* Update document

* Fix tests

* Add back init script to create db and user permission

* Remove old schema files

* Dry database instance implementations

* Revert unnecessary changes

* Rename resource directories

* Format code

* Add readme

* Move and rename database schema to jobs database schema

* Introduce table schema interface

* Rearrange packages

* Format code

* Address review comments

* Show more logs for acceptance test

* Do not depend on service uuid for db readiness
  • Loading branch information
tuliren authored Jul 26, 2021
1 parent 7fbf15c commit e8f20b2
Show file tree
Hide file tree
Showing 39 changed files with 764 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;

import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Configs;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,25 +104,19 @@ ConfigPersistence getDbPersistence(ConfigPersistence seedConfigPersistence) thro

DatabaseConfigPersistence dbConfigPersistence;
if (setupDatabase) {
// When we need to setup the database, it means the database will be initialized after
// we connect to the database. So the database itself is considered ready as long as
// the connection is alive.
Database database = Databases.createPostgresDatabaseWithRetry(
Database database = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl(),
Databases.IS_CONFIG_DATABASE_CONNECTED);
configs.getConfigDatabaseUrl())
.getAndInitialize();
dbConfigPersistence = new DatabaseConfigPersistence(database)
.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA))
.loadData(seedConfigPersistence);
} else {
// When we don't need to setup the database, it means the database is initialized
// somewhere else, and it is considered ready only when data has been loaded into it.
Database database = Databases.createPostgresDatabaseWithRetry(
Database database = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl(),
Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA);
configs.getConfigDatabaseUrl())
.getInitialized();
dbConfigPersistence = new DatabaseConfigPersistence(database);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.UPDATED_AT;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.select;

Expand Down Expand Up @@ -67,25 +67,6 @@ public DatabaseConfigPersistence(Database database) {
this.database = new ExceptionWrappingDatabase(database);
}

/**
* Initialize the database by creating the {@code airbyte_configs} table.
*/
public DatabaseConfigPersistence initialize(String schema) throws IOException {
database.transaction(ctx -> {
boolean hasConfigsTable = ctx.fetchExists(select()
.from("information_schema.tables")
.where("table_name = 'airbyte_configs'"));
if (hasConfigsTable) {
return null;
}
LOGGER.info("Config database has not been initialized");
LOGGER.info("Creating tables with schema: {}", schema);
ctx.execute(schema);
return null;
});
return this;
}

/**
* Populate the {@code airbyte_configs} table with configs from the seed persistence. Only do so if
* the table is empty. Otherwise, we assume that it has been populated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.UPDATED_AT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -40,12 +39,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -92,8 +90,8 @@ public static void dbDown() {

@BeforeEach
public void setup() throws Exception {
database = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl());
database.transaction(ctx -> ctx.execute("DROP TABLE IF EXISTS airbyte_configs"));
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
database.transaction(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs"));
}

@AfterEach
Expand Down Expand Up @@ -130,10 +128,8 @@ public void testCreateDbPersistenceWithFileSeed() throws Exception {
@Test
public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
// Initialize the database with one config.
String schema = MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA);
Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
database.transaction(ctx -> {
ctx.execute(schema);
ctx.insertInto(AIRBYTE_CONFIGS)
.set(CONFIG_ID, SOURCE_GITHUB.getSourceDefinitionId().toString())
.set(CONFIG_TYPE, ConfigSchema.STANDARD_SOURCE_DEFINITION.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,26 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.select;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import java.sql.Timestamp;
import java.time.Instant;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.jooq.JSONB;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.exception.DataAccessException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -87,9 +74,8 @@ public static void dbDown() {

@BeforeEach
public void setup() throws Exception {
database = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl());
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = new DatabaseConfigPersistence(database);
configPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA));
database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs"));
}

Expand All @@ -98,25 +84,6 @@ void tearDown() throws Exception {
database.close();
}

@Test
public void testInitialize() throws Exception {
// check table
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS)));
// check columns (if any of the column does not exist, the query will throw exception)
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_ID.eq("ID"))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_TYPE.eq("TYPE"))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_BLOB.eq(JSONB.valueOf("{}")))));
Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CREATED_AT.eq(timestamp))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(UPDATED_AT.eq(timestamp))));

// when the airbyte_configs has been created, calling initialize again will not change anything
String testSchema = "CREATE TABLE IF NOT EXISTS airbyte_test_configs(id BIGINT PRIMARY KEY);";
configPersistence.initialize(testSchema);
// the airbyte_test_configs table does not exist
assertThrows(DataAccessException.class, () -> database.query(ctx -> ctx.fetchExists(select().from("airbyte_test_configs"))));
}

@Test
public void testLoadData() throws Exception {
ConfigPersistence seedPersistence = mock(ConfigPersistence.class);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-db/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM postgres:13-alpine

COPY src/main/resources/schema.sql /docker-entrypoint-initdb.d/000_init.sql
COPY src/main/resources/init.sql /docker-entrypoint-initdb.d/000_init.sql
11 changes: 11 additions & 0 deletions airbyte-db/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# How to Create a New Database

Check `io.airbyte.db.instance.configs` for example.

- Create a new package under `io.airbyte.db.instance` with the name of the database.
- Create the database schema enum that defines all tables in the database.
- Write a SQL script that initializes the database.
- The default path for this file is `resource/<db-name>_database/schema.sql`.
- Implement the `DatabaseInstance` interface that initializes the database by executing the initialization script.
- [Optional] For each table, create a constant class that defines the table and the columns in jooq.
- This is necessary only if you plan to use jooq to query the table.
1 change: 1 addition & 0 deletions airbyte-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
api 'org.postgresql:postgresql:42.2.18'

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-json-validation')

testImplementation project(':airbyte-test-utils')

Expand Down
30 changes: 0 additions & 30 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

package io.airbyte.db;

import static org.jooq.impl.DSL.select;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -43,34 +41,6 @@ public class Databases {

private static final Logger LOGGER = LoggerFactory.getLogger(Databases.class);

// The Job Database is initialized by SQL script, which writes a server UUID at the end.
// So this database is ready when the server UUID record is present.
public static final Function<Database, Boolean> IS_JOB_DATABASE_READY = database -> {
try {
Optional<String> uuid = ServerUuid.get(database);
return uuid.isPresent();
} catch (Exception e) {
return false;
}
};
public static final Function<Database, Boolean> IS_CONFIG_DATABASE_CONNECTED = database -> {
try {
LOGGER.info("Testing config database connection...");
return database.query(ctx -> ctx.fetchExists(select().from("information_schema.tables")));
} catch (Exception e) {
LOGGER.info("Unsuccessful connection to config database", e);
return false;
}
};
public static final Function<Database, Boolean> IS_CONFIG_DATABASE_LOADED_WITH_DATA = database -> {
try {
LOGGER.info("Testing if airbyte_configs has been created...");
return database.query(ctx -> ctx.fetchExists(select().from("airbyte_configs")));
} catch (Exception e) {
return false;
}
};

public static Database createPostgresDatabase(String username, String password, String jdbcConnectionString) {
return createDatabase(username, password, jdbcConnectionString, "org.postgresql.Driver", SQLDialect.POSTGRES);
}
Expand Down
Loading

0 comments on commit e8f20b2

Please sign in to comment.