Skip to content

Commit

Permalink
improve logging in MsSQLTestDatabase
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Feb 24, 2024
1 parent 2d0c49f commit d21b594
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.19.0'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,6 +70,7 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod

public MsSQLTestDatabase(final MSSQLServerContainer<?> container) {
super(container);
LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName());
}

public MsSQLTestDatabase withCdc() {
Expand Down Expand Up @@ -103,39 +106,39 @@ public MsSQLTestDatabase withShortenedCapturePollingInterval() {

private void waitForAgentState(final boolean running) {
final String expectedValue = running ? "Running." : "Stopped.";
LOGGER.debug("Waiting for SQLServerAgent state to change to '{}'.", expectedValue);
LOGGER.info(formatLogLine("Waiting for SQLServerAgent state to change to '{}'."), expectedValue);
for (int i = 0; i < MAX_RETRIES; i++) {
try {
final var r = query(ctx -> ctx.fetch("EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';").get(0));
if (expectedValue.equalsIgnoreCase(r.getValue(0).toString())) {
LOGGER.debug("SQLServerAgent state is '{}', as expected.", expectedValue);
LOGGER.info(formatLogLine("SQLServerAgent state is '{}', as expected."), expectedValue);
return;
}
LOGGER.debug("Retrying, SQLServerAgent state {} does not match expected '{}'.", r, expectedValue);
LOGGER.info(formatLogLine("Retrying, SQLServerAgent state {} does not match expected '{}'."), r, expectedValue);
} catch (final SQLException e) {
LOGGER.debug("Retrying agent state query after catching exception {}.", e.getMessage());
LOGGER.info(formatLogLine("Retrying agent state query after catching exception {}."), e.getMessage());
}
try {
Thread.sleep(1_000); // Wait one second between retries.
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
throw new RuntimeException("Exhausted retry attempts while polling for agent state");
throw new RuntimeException(formatLogLine("Exhausted retry attempts while polling for agent state"));
}

public MsSQLTestDatabase withWaitUntilMaxLsnAvailable() {
LOGGER.debug("Waiting for max LSN to become available for database {}.", getDatabaseName());
LOGGER.info(formatLogLine("Waiting for max LSN to become available for database {}."), getDatabaseName());
for (int i = 0; i < MAX_RETRIES; i++) {
try {
final var maxLSN = query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn();").get(0).get(0, byte[].class));
if (maxLSN != null) {
LOGGER.debug("Max LSN available for database {}: {}", getDatabaseName(), Lsn.valueOf(maxLSN));
LOGGER.info(formatLogLine("Max LSN available for database {}: {}"), getDatabaseName(), Lsn.valueOf(maxLSN));
return self();
}
LOGGER.debug("Retrying, max LSN still not available for database {}.", getDatabaseName());
LOGGER.info(formatLogLine("Retrying, max LSN still not available for database {}."), getDatabaseName());
} catch (final SQLException e) {
LOGGER.warn("Retrying max LSN query after catching exception {}", e.getMessage());
LOGGER.info(formatLogLine("Retrying max LSN query after catching exception {}"), e.getMessage());
}
try {
Thread.sleep(1_000); // Wait one second between retries.
Expand All @@ -157,10 +160,10 @@ public String getJdbcUrl() {
}

@Override
protected Stream<Stream<String>> inContainerBootstrapCmd() {
return Stream.of(
mssqlCmd(Stream.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(Stream.of(
protected List<List<String>> inContainerBootstrapCmd() {
return List.of(
mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(List.of(
String.format("USE %s", getDatabaseName()),
String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()),
String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()),
Expand All @@ -174,22 +177,22 @@ protected Stream<Stream<String>> inContainerBootstrapCmd() {
* aren't really worth it.
*/
@Override
protected Stream<String> inContainerUndoBootstrapCmd() {
return Stream.empty();
protected List<String> inContainerUndoBootstrapCmd() {
return Collections.emptyList();
}

public void dropDatabaseAndUser() {
execInContainer(mssqlCmd(Stream.of(
execInContainer(mssqlCmd(List.of(
String.format("USE master"),
String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()),
String.format("DROP DATABASE %s", getDatabaseName()))));
}

public Stream<String> mssqlCmd(final Stream<String> sql) {
return Stream.of("/opt/mssql-tools/bin/sqlcmd",
public List<String> mssqlCmd(final List<String> sql) {
return List.of("/opt/mssql-tools/bin/sqlcmd",
"-U", getContainer().getUsername(),
"-P", getContainer().getPassword(),
"-Q", sql.collect(Collectors.joining("; ")),
"-Q", StringUtils.join("; "),
"-b", "-e");
}

Expand Down

0 comments on commit d21b594

Please sign in to comment.