Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try removing concurrency for source-mssql tests #35125

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand All @@ -51,12 +55,30 @@ abstract public class TestDatabase<C extends JdbcDatabaseContainer<?>, T extends
final private ArrayList<String> cleanupSQL = new ArrayList<>();
final private Map<String, String> connectionProperties = new HashMap<>();

private DataSource dataSource;
private DSLContext dslContext;
private volatile DataSource dataSource;
private volatile DSLContext dslContext;

protected final int databaseId;
private static final AtomicInteger nextDatabaseId = new AtomicInteger(0);

protected final int containerId;
private static final AtomicInteger nextContainerId = new AtomicInteger(0);
private static final Map<String, Integer> containerUidToId = new ConcurrentHashMap<>();

@SuppressWarnings("this-escape")
protected TestDatabase(C container) {
this.container = container;
this.suffix = Strings.addRandomSuffix("", "_", 10);
this.databaseId = nextDatabaseId.getAndIncrement();
this.containerId = containerUidToId.computeIfAbsent(container.getContainerId(), k -> nextContainerId.getAndIncrement());
LOGGER.info(formatLogLine("creating database " + getDatabaseName()));
}

private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

protected String formatLogLine(String logLine) {
String retVal = "SGX TestDatabase databaseId=" + databaseId + ", containerId=" + containerId + " - " + logLine;
return retVal;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -87,7 +109,7 @@ public T onClose(String fmtSql, Object... fmtArgs) {
* Executes a SQL statement after calling String.format on the arguments.
*/
public T with(String fmtSql, Object... fmtArgs) {
execSQL(Stream.of(String.format(fmtSql, fmtArgs)));
execSQL(List.of(String.format(fmtSql, fmtArgs)));
return self();
}

Expand All @@ -113,9 +135,9 @@ final public boolean isInitialized() {
return dslContext != null;
}

abstract protected Stream<Stream<String>> inContainerBootstrapCmd();
abstract protected List<List<String>> inContainerBootstrapCmd();

abstract protected Stream<String> inContainerUndoBootstrapCmd();
abstract protected List<String> inContainerUndoBootstrapCmd();

abstract public DatabaseDriver getDatabaseDriver();

Expand Down Expand Up @@ -167,32 +189,35 @@ public Database getDatabase() {
return new Database(getDslContext());
}

protected void execSQL(final Stream<String> sql) {
protected void execSQL(final List<String> sqls) {
try {
getDatabase().query(ctx -> {
sql.forEach(statement -> {
LOGGER.debug("{}", statement);
ctx.execute(statement);
});
return null;
});
} catch (SQLException e) {
for (String sql : sqls) {
LOGGER.info(formatLogLine("executing SQL: " + sql));
try {
getDslContext().execute(sql);
LOGGER.info(formatLogLine("completed SQL: " + sql));
} catch (Throwable t) {
LOGGER.error(formatLogLine("error when executing SQL: " + sql + "e=\n" + t));
throw t;
}
}
} catch (DataAccessException e) {
throw new RuntimeException(e);
}
}

protected void execInContainer(Stream<String> cmds) {
final List<String> cmd = cmds.toList();
protected void execInContainer(List<String> cmd) {
if (cmd.isEmpty()) {
return;
}
try {
LOGGER.debug("executing {}", Strings.join(cmd, " "));
LOGGER.info(formatLogLine(String.format("executing command %s", Strings.join(cmd, " "))));
final var exec = getContainer().execInContainer(cmd.toArray(new String[0]));
if (exec.getExitCode() == 0) {
LOGGER.debug("execution success\nstdout:\n{}\nstderr:\n{}", exec.getStdout(), exec.getStderr());
LOGGER.info(formatLogLine(String.format("execution success\nstdout:\n%s\nstderr:\n%s", exec.getStdout(), exec.getStderr())));
} else {
LOGGER.error("execution failure, code {}\nstdout:\n{}\nstderr:\n{}", exec.getExitCode(), exec.getStdout(), exec.getStderr());
LOGGER.error(formatLogLine(
String.format("execution failure, code %s\nstdout:\n%s\nstderr:\n%s", exec.getExitCode(), exec.getStdout(), exec.getStderr())));
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -232,8 +257,9 @@ public B integrationTestConfigBuilder() {

@Override
public void close() {
execSQL(this.cleanupSQL.stream());
execSQL(this.cleanupSQL);
execInContainer(inContainerUndoBootstrapCmd());
LOGGER.info("closing database databaseId=" + databaseId);
}

static public class ConfigBuilder<T extends TestDatabase<?, ?, ?>, B extends ConfigBuilder<T, B>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
Expand All @@ -20,10 +21,10 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import java.sql.JDBCType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -140,24 +141,27 @@ public BareBonesTestDatabase(PostgreSQLContainer<?> container) {
}

@Override
protected Stream<Stream<String>> inContainerBootstrapCmd() {
final var sql = Stream.of(
protected List<List<String>> inContainerBootstrapCmd() {
final var sqls = List.of(
String.format("CREATE DATABASE %s", getDatabaseName()),
String.format("CREATE USER %s PASSWORD '%s'", getUserName(), getPassword()),
String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", getDatabaseName(), getUserName()),
String.format("ALTER USER %s WITH SUPERUSER", getUserName()));
return Stream.of(Stream.concat(
Stream.of("psql",
"-d", getContainer().getDatabaseName(),
"-U", getContainer().getUsername(),
"-v", "ON_ERROR_STOP=1",
"-a"),
sql.flatMap(stmt -> Stream.of("-c", stmt))));
List<String> cmd = Lists.newArrayList("psql",
"-d", getContainer().getDatabaseName(),
"-U", getContainer().getUsername(),
"-v", "ON_ERROR_STOP=1",
"-a");
for (String sql : sqls) {
cmd.add("-c");
cmd.add(sql);
}
return List.of(cmd);
}

@Override
protected Stream<String> inContainerUndoBootstrapCmd() {
return Stream.empty();
protected List<String> inContainerUndoBootstrapCmd() {
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Collections;
import java.util.Iterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -289,7 +288,7 @@ void testUnicodeNull() {
}

@Test
@DisplayName("When initial cursor is null, and emit state for every record")
// When initial cursor is null, and emit state for every record
void testStateEmissionFrequency1() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
Expand Down Expand Up @@ -320,7 +319,7 @@ void testStateEmissionFrequency1() {
}

@Test
@DisplayName("When initial cursor is null, and emit state for every 2 records")
// When initial cursor is null, and emit state for every 2 records
void testStateEmissionFrequency2() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
Expand All @@ -346,7 +345,7 @@ void testStateEmissionFrequency2() {
}

@Test
@DisplayName("When initial cursor is not null")
// When initial cursor is not null
void testStateEmissionWhenInitialCursorIsNotNull() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
Expand Down Expand Up @@ -396,7 +395,7 @@ void testStateEmissionWhenInitialCursorIsNotNull() {
* <a href="https://github.com/airbytehq/airbyte/issues/15427">link</a>
*/
@Test
@DisplayName("When there are multiple records with the same cursor value")
// When there are multiple records with the same cursor value
void testStateEmissionForRecordsSharingSameCursorValue() {

messageIterator = MoreIterators.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -327,7 +326,7 @@ protected void assertExpectedRecords(final Set<JsonNode> expectedRecords,
}

@Test
@DisplayName("On the first sync, produce returns records that exist in the database.")
// On the first sync, produce returns records that exist in the database.
void testExistingData() throws Exception {
final CdcTargetPosition targetPosition = cdcLatestTargetPosition();
final AutoCloseableIterator<AirbyteMessage> read = source().read(config(), getConfiguredCatalog(), null);
Expand All @@ -351,7 +350,7 @@ protected void compareTargetPositionFromTheRecordsWithTargetPostionGeneratedBefo
}

@Test
@DisplayName("When a record is deleted, produces a deletion record.")
// When a record is deleted, produces a deletion record.
void testDelete() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source()
.read(config(), getConfiguredCatalog(), null);
Expand Down Expand Up @@ -379,7 +378,7 @@ protected void assertExpectedStateMessagesFromIncrementalSync(final List<Airbyte
}

@Test
@DisplayName("When a record is updated, produces an update record.")
// When a record is updated, produces an update record.
void testUpdate() throws Exception {
final String updatedModel = "Explorer";
final AutoCloseableIterator<AirbyteMessage> read1 = source()
Expand All @@ -406,7 +405,8 @@ void testUpdate() throws Exception {

@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
@DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.")
// Verify that when data is inserted into the database while a sync is happening and after the first
// sync, it all gets replicated.
protected void testRecordsProducedDuringAndAfterSync() throws Exception {

final int recordsToCreate = 20;
Expand Down Expand Up @@ -472,7 +472,8 @@ protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(f
}

@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
// When both incremental CDC and full refresh are configured for different streams in a sync, the
// data is replicated as expected.
void testCdcAndFullRefreshInSameSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog());

Expand Down Expand Up @@ -545,7 +546,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
}

@Test
@DisplayName("When no records exist, no records are returned.")
// When no records exist, no records are returned.
void testNoData() throws Exception {

deleteCommand(MODELS_STREAM_NAME);
Expand All @@ -563,7 +564,7 @@ protected void assertExpectedStateMessagesForNoData(final List<AirbyteStateMessa
}

@Test
@DisplayName("When no changes have been made to the database since the previous sync, no records are returned.")
// When no changes have been made to the database since the previous sync, no records are returned.
void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source()
.read(config(), getConfiguredCatalog(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.mockito.Mockito.when;

import java.util.concurrent.Callable;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
Expand All @@ -28,7 +27,7 @@ public class AirbyteApiClientTest {
class RetryWithJitter {

@Test
@DisplayName("Should not retry on success")
// Should not retry on success
void ifSucceedShouldNotRetry() throws Exception {
mockCallable = mock(Callable.class);
when(mockCallable.call()).thenReturn("Success!");
Expand All @@ -39,7 +38,7 @@ void ifSucceedShouldNotRetry() throws Exception {
}

@Test
@DisplayName("Should retry up to the configured max retries on continued errors")
// Should retry up to the configured max retries on continued errors
void onlyRetryTillMaxRetries() throws Exception {
mockCallable = mock(Callable.class);
when(mockCallable.call()).thenThrow(new RuntimeException("Bomb!"));
Expand All @@ -51,7 +50,7 @@ void onlyRetryTillMaxRetries() throws Exception {
}

@Test
@DisplayName("Should retry only if there are errors")
// Should retry only if there are errors
void onlyRetryOnErrors() throws Exception {
mockCallable = mock(Callable.class);
// Because we succeed on the second try, we should only call the method twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.MDC;

Expand All @@ -23,7 +22,7 @@ void init() {
}

@Test
@DisplayName("The MDC context is properly overrided")
// The MDC context is properly overrided
void testMDCModified() {
try (final MdcScope ignored = new MdcScope(modificationInMDC)) {
final Map<String, String> mdcState = MDC.getCopyOfContextMap();
Expand All @@ -34,7 +33,7 @@ void testMDCModified() {
}

@Test
@DisplayName("The MDC context is properly restored")
// The MDC context is properly restored
void testMDCRestore() {
try (final MdcScope ignored = new MdcScope(modificationInMDC)) {}

Expand Down
Loading
Loading