Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a test container for running Airbyte with docker-compose #4970

Merged
merged 9 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class AirbyteApiClient {
private final SourceApi sourceApi;
private final SourceDefinitionSpecificationApi sourceDefinitionSpecificationApi;
private final WorkspaceApi workspaceApi;
private final HealthApi healthApi;

public AirbyteApiClient(ApiClient apiClient) {
connectionApi = new ConnectionApi(apiClient);
Expand All @@ -64,6 +65,7 @@ public AirbyteApiClient(ApiClient apiClient) {
sourceApi = new SourceApi(apiClient);
sourceDefinitionSpecificationApi = new SourceDefinitionSpecificationApi(apiClient);
workspaceApi = new WorkspaceApi(apiClient);
healthApi = new HealthApi(apiClient);
}

public ConnectionApi getConnectionApi() {
Expand Down Expand Up @@ -110,4 +112,8 @@ public OperationApi getOperationApi() {
return operationApi;
}

public HealthApi getHealthApi() {
return healthApi;
}

}
23 changes: 22 additions & 1 deletion airbyte-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,43 @@ configurations {
}

dependencies {
implementation project(':airbyte-api')

implementation 'org.testcontainers:testcontainers:1.15.3'

acceptanceTestsImplementation project(':airbyte-api')
acceptanceTestsImplementation project(':airbyte-commons')
acceptanceTestsImplementation project(':airbyte-config:persistence')
acceptanceTestsImplementation project(':airbyte-db')
acceptanceTestsImplementation project(':airbyte-tests')
acceptanceTestsImplementation project(':airbyte-test-utils')

acceptanceTestsImplementation 'com.fasterxml.jackson.core:jackson-databind'
acceptanceTestsImplementation 'io.github.cdimascio:java-dotenv:3.0.0'
acceptanceTestsImplementation 'org.apache.commons:commons-csv:1.4'
acceptanceTestsImplementation 'org.testcontainers:postgresql:1.15.1'
acceptanceTestsImplementation 'org.postgresql:postgresql:42.2.18'
acceptanceTestsImplementation 'com.fasterxml.jackson.core:jackson-databind'

automaticMigrationAcceptanceTestImplementation project(':airbyte-api')
automaticMigrationAcceptanceTestImplementation project(':airbyte-commons')
automaticMigrationAcceptanceTestImplementation project(':airbyte-tests')

automaticMigrationAcceptanceTestImplementation 'org.testcontainers:testcontainers:1.15.3'
}

// test should run using the current version of the docker compose configuration.
task copyComposeFileForAcceptanceTests(type: Copy) {
from "${rootDir}/docker-compose.yaml"
into "${sourceSets.acceptanceTests.output.resourcesDir}"
}
task copyComposeFileForMigrationAcceptanceTests(type: Copy) {
from "${rootDir}/docker-compose.yaml"
into "${sourceSets.automaticMigrationAcceptanceTest.output.resourcesDir}"
}

assemble.dependsOn(project.tasks.copyComposeFileForAcceptanceTests)
assemble.dependsOn(project.tasks.copyComposeFileForMigrationAcceptanceTests)

task acceptanceTests(type: Test) {
testClassesDirs += sourceSets.acceptanceTests.output.classesDirs
classpath += sourceSets.acceptanceTests.runtimeClasspath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.io.Resources;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.JobsApi;
import io.airbyte.api.client.invoker.ApiClient;
Expand Down Expand Up @@ -91,10 +91,17 @@
import io.airbyte.config.persistence.PersistenceConstants;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.test.airbyte_test_container.AirbyteTestContainer;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.io.File;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -123,7 +130,7 @@
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;

@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "ConstantConditions"})
// We order tests such that earlier tests test more basic behavior that is relied upon in later
// tests.
// e.g. We test that we can create a destination before we test whether we can sync data to it.
Expand All @@ -137,6 +144,10 @@ public class AcceptanceTests {

private static final Logger LOGGER = LoggerFactory.getLogger(AcceptanceTests.class);

@SuppressWarnings("UnstableApiUsage")
private static final URL DOCKER_COMPOSE_FILE_URL = Resources.getResource("docker-compose.yaml");
private static final File ENV_FILE = Path.of(System.getProperty("user.dir")).getParent().resolve(".env").toFile();

private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.0";
private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.0";

Expand All @@ -156,6 +167,7 @@ public class AcceptanceTests {

private static PostgreSQLContainer sourcePsql;
private static PostgreSQLContainer destinationPsql;
private static AirbyteTestContainer airbyteTestContainer;

private AirbyteApiClient apiClient;

Expand All @@ -165,16 +177,38 @@ public class AcceptanceTests {
private List<UUID> operationIds;

@BeforeAll
public static void init() {
public static void init() throws URISyntaxException, IOException, InterruptedException {
sourcePsql = new PostgreSQLContainer("postgres:13-alpine")
.withUsername(SOURCE_USERNAME)
.withPassword(SOURCE_PASSWORD);
sourcePsql.start();

// by default use airbyte deployment governed by a test container.
if (System.getenv("USE_EXTERNAL_DEPLOYMENT") == null || System.getenv("USE_EXTERNAL_DEPLOYMENT").equals("FALSE")) {
LOGGER.info("Using deployment of airbyte managed by test containers.");
airbyteTestContainer = new AirbyteTestContainer.Builder(new File(DOCKER_COMPOSE_FILE_URL.toURI()))
.setEnv(ENV_FILE)
// override env VERSION to use dev to test current build of airbyte.
.setEnvVariable("VERSION", "dev")
// override to use test mounts.
.setEnvVariable("DATA_DOCKER_MOUNT", "airbyte_data_migration_test")
.setEnvVariable("DB_DOCKER_MOUNT", "airbyte_db_migration_test")
.setEnvVariable("WORKSPACE_DOCKER_MOUNT", "airbyte_workspace_migration_test")
.setEnvVariable("LOCAL_ROOT", "/tmp/airbyte_local_migration_test")
.setEnvVariable("LOCAL_DOCKER_MOUNT", "/tmp/airbyte_local_migration_test")
.build();
airbyteTestContainer.start();
} else {
LOGGER.info("Using external deployment of airbyte.");
}
}

@AfterAll
public static void end() {
sourcePsql.stop();
if (airbyteTestContainer != null) {
airbyteTestContainer.stop();
}
}

@BeforeEach
Expand Down Expand Up @@ -260,7 +294,7 @@ public void testCreateDestination() throws ApiException {
final UUID destinationDefId = getDestinationDefId();
final JsonNode destinationConfig = getDestinationDbConfig();
final UUID workspaceId = PersistenceConstants.DEFAULT_WORKSPACE_ID;
final String name = "AccTestDestinationDb-" + UUID.randomUUID().toString();
final String name = "AccTestDestinationDb-" + UUID.randomUUID();

final DestinationRead createdDestination = createDestination(
name,
Expand Down Expand Up @@ -373,12 +407,12 @@ public void testCreateConnection() throws ApiException {
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final UUID destinationId = createDestination().getDestinationId();
final UUID operationId = createOperation().getOperationId();
final String name = "test-connection-" + UUID.randomUUID().toString();
final String name = "test-connection-" + UUID.randomUUID();
final ConnectionSchedule schedule = new ConnectionSchedule().timeUnit(MINUTES).units(100L);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final ConnectionRead createdConnection = createConnection(name, sourceId, destinationId, List.of(operationId), catalog, schedule, syncMode);
final ConnectionRead createdConnection = createConnection(name, sourceId, destinationId, List.of(operationId), catalog, schedule);

assertEquals(sourceId, createdConnection.getSourceId());
assertEquals(destinationId, createdConnection.getDestinationId());
Expand All @@ -401,7 +435,7 @@ public void testManualSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
Expand Down Expand Up @@ -431,7 +465,7 @@ public void testIncrementalSync() throws Exception {
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -492,7 +526,7 @@ public void testScheduledSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, connectionSchedule, syncMode);
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, connectionSchedule);

// When a new connection is created, Airbyte might sync it immediately (before the sync interval).
// Then it will wait the sync interval.
Expand Down Expand Up @@ -521,7 +555,7 @@ public void testMultipleSchemasAndTablesSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
Expand All @@ -546,7 +580,7 @@ public void testMultipleSchemasSameTablesSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
Expand All @@ -571,7 +605,7 @@ public void testIncrementalDedupeSync() throws Exception {
.destinationSyncMode(destinationSyncMode)
.primaryKey(List.of(List.of(COLUMN_NAME))));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// sync from start
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
Expand Down Expand Up @@ -651,7 +685,7 @@ public void testCheckpointing() throws Exception {
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null, syncMode).getConnectionId();
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -681,8 +715,9 @@ public void testCheckpointing() throws Exception {
matches = "true")
public void testRedactionOfSensitiveRequestBodies() throws Exception {
// check that the source password is not present in the logs
final List<String> serverLogLines = Files.readLines(
apiClient.getLogsApi().getLogs(new LogsRequestBody().logType(LogType.SERVER)), Charset.defaultCharset());
final List<String> serverLogLines = Files.readAllLines(
apiClient.getLogsApi().getLogs(new LogsRequestBody().logType(LogType.SERVER)).toPath(),
Charset.defaultCharset());

assertTrue(serverLogLines.size() > 0);

Expand Down Expand Up @@ -742,7 +777,7 @@ public void testBackpressure() throws Exception {
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);

final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null, SyncMode.FULL_REFRESH)
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
Expand Down Expand Up @@ -814,7 +849,7 @@ private Set<SchemaTableNamePair> listAllTables(Database database) throws SQLExce
}

private Set<SchemaTableNamePair> addAirbyteGeneratedTables(boolean withScdTable, Set<SchemaTableNamePair> sourceTables) {
final Set<SchemaTableNamePair> sourceTablesWithRawTablesAdded = sourceTables.stream().flatMap(x -> {
return sourceTables.stream().flatMap(x -> {
final String cleanedNameStream = x.tableName.replace(".", "_");
final List<SchemaTableNamePair> explodedStreamNames = new ArrayList<>(List.of(
new SchemaTableNamePair(OUTPUT_NAMESPACE_PREFIX + x.schemaName,
Expand All @@ -826,7 +861,6 @@ private Set<SchemaTableNamePair> addAirbyteGeneratedTables(boolean withScdTable,
}
return explodedStreamNames.stream();
}).collect(Collectors.toSet());
return sourceTablesWithRawTablesAdded;
}

private void assertRawDestinationContains(List<JsonNode> sourceRecords, SchemaTableNamePair pair) throws Exception {
Expand Down Expand Up @@ -865,8 +899,7 @@ private ConnectionRead createConnection(String name,
UUID destinationId,
List<UUID> operationIds,
AirbyteCatalog catalog,
ConnectionSchedule schedule,
SyncMode syncMode)
ConnectionSchedule schedule)
throws ApiException {
final ConnectionRead connection = apiClient.getConnectionApi().createConnection(
new ConnectionCreate()
Expand All @@ -886,7 +919,7 @@ private ConnectionRead createConnection(String name,

private DestinationRead createDestination() throws ApiException {
return createDestination(
"AccTestDestination-" + UUID.randomUUID().toString(),
"AccTestDestination-" + UUID.randomUUID(),
PersistenceConstants.DEFAULT_WORKSPACE_ID,
getDestinationDefId(),
getDestinationDbConfig());
Expand Down Expand Up @@ -1011,7 +1044,7 @@ private JsonNode getDbConfig(PostgreSQLContainer psql, boolean hiddenPassword, b

private SourceRead createPostgresSource() throws ApiException {
return createSource(
"acceptanceTestDb-" + UUID.randomUUID().toString(),
"acceptanceTestDb-" + UUID.randomUUID(),
PersistenceConstants.DEFAULT_WORKSPACE_ID,
getPostgresSourceDefinitionId(),
getSourceDbConfig());
Expand All @@ -1030,7 +1063,7 @@ private SourceRead createSource(String name, UUID workspaceId, UUID sourceDefId,
private UUID getPostgresSourceDefinitionId() throws ApiException {
return apiClient.getSourceDefinitionApi().listSourceDefinitions().getSourceDefinitions()
.stream()
.filter(sourceRead -> sourceRead.getName().toLowerCase().equals("postgres"))
.filter(sourceRead -> sourceRead.getName().equalsIgnoreCase("postgres"))
.findFirst()
.orElseThrow()
.getSourceDefinitionId();
Expand Down Expand Up @@ -1074,6 +1107,7 @@ private static void waitForSuccessfulJob(JobsApi jobsApi, JobRead originalJob) t
assertEquals(JobStatus.SUCCEEDED, job.getStatus());
}

@SuppressWarnings("BusyWait")
private static JobRead waitForJob(JobsApi jobsApi, JobRead originalJob, Set<JobStatus> jobStatuses) throws InterruptedException, ApiException {
JobRead job = originalJob;
int count = 0;
Expand All @@ -1087,6 +1121,7 @@ private static JobRead waitForJob(JobsApi jobsApi, JobRead originalJob, Set<JobS
return job;
}

@SuppressWarnings("BusyWait")
private static ConnectionState waitForConnectionState(AirbyteApiClient apiClient, UUID connectionId) throws ApiException, InterruptedException {
ConnectionState connectionState = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
int count = 0;
Expand Down
Loading