Skip to content

Commit

Permalink
DBZ-2637 Enhance Postgres validation checks
Browse files Browse the repository at this point in the history
+ add wal_level config check
+ add LOGIN and REPLICATION roles check
+ add replication slot is already in use check
  • Loading branch information
rk3rn3r authored and jpechane committed Nov 16, 2020
1 parent 1dbe6c0 commit c59e00c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,40 +75,78 @@ public ConfigDef config() {

@Override
public Config validate(Map<String, String> connectorConfigs) {
PostgresConnectorConfig config = new PostgresConnectorConfig(Configuration.from(connectorConfigs));
final PostgresConnectorConfig config = new PostgresConnectorConfig(Configuration.from(connectorConfigs));

// First, validate all of the individual fields, which is easy since don't make any of the fields invisible ...
Map<String, ConfigValue> results = config.validate();
final Map<String, ConfigValue> results = config.validate();

// Get the config values for each of the connection-related fields ...
ConfigValue hostnameValue = results.get(PostgresConnectorConfig.HOSTNAME.name());
ConfigValue portValue = results.get(PostgresConnectorConfig.PORT.name());
ConfigValue databaseValue = results.get(PostgresConnectorConfig.DATABASE_NAME.name());
ConfigValue userValue = results.get(PostgresConnectorConfig.USER.name());
ConfigValue passwordValue = results.get(PostgresConnectorConfig.PASSWORD.name());
final ConfigValue hostnameResult = results.get(PostgresConnectorConfig.HOSTNAME.name());
final ConfigValue portResult = results.get(PostgresConnectorConfig.PORT.name());
final ConfigValue databaseNameResult = results.get(PostgresConnectorConfig.DATABASE_NAME.name());
final ConfigValue userResult = results.get(PostgresConnectorConfig.USER.name());
final ConfigValue passwordResult = results.get(PostgresConnectorConfig.PASSWORD.name());
final ConfigValue slotNameResult = results.get(PostgresConnectorConfig.SLOT_NAME.name());
final ConfigValue pluginNameResult = results.get(PostgresConnectorConfig.PLUGIN_NAME.name());
final String passwordStringValue = config.getConfig().getString(PostgresConnectorConfig.PASSWORD);

if (Strings.isNullOrEmpty(passwordStringValue)) {
logger.warn("The connection password is empty");
}

// If there are no errors on any of these ...
if (hostnameValue.errorMessages().isEmpty()
&& portValue.errorMessages().isEmpty()
&& userValue.errorMessages().isEmpty()
&& passwordValue.errorMessages().isEmpty()
&& databaseValue.errorMessages().isEmpty()) {
if (hostnameResult.errorMessages().isEmpty()
&& portResult.errorMessages().isEmpty()
&& userResult.errorMessages().isEmpty()
&& passwordResult.errorMessages().isEmpty()
&& databaseNameResult.errorMessages().isEmpty()
&& slotNameResult.errorMessages().isEmpty()
&& pluginNameResult.errorMessages().isEmpty()) {
// Try to connect to the database ...
try (PostgresConnection connection = new PostgresConnection(config.jdbcConfig())) {
try {
// check connection
connection.execute("SELECT version()");
logger.info("Successfully tested connection for {} with user '{}'", connection.connectionString(),
connection.username());
// check server wal_level
final String walLevel = connection.queryAndMap(
"SHOW wal_level",
connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level"));
if (!"logical".equals(walLevel)) {
final String errorMessage = "Postgres server wal_level property must be \"logical\" but is: " + walLevel;
logger.error(errorMessage);
hostnameResult.addErrorMessage(errorMessage);
}
// check user for LOGIN and REPLICATION roles
if (!connection.queryAndMap(
"SELECT rolcanlogin, rolreplication FROM pg_roles WHERE rolname = current_user",
connection.singleResultMapper(rs -> rs.getBoolean("rolcanlogin") && rs.getBoolean("rolreplication"), "Could not fetch roles"))) {
final String errorMessage = "Postgres roles LOGIN and REPLICATION are not assigned to user: " + connection.username();
logger.error(errorMessage);
userResult.addErrorMessage(errorMessage);
}
// check replication slot
final String slotName = config.slotName();
if (connection.prepareQueryAndMap(
"SELECT * FROM pg_replication_slots WHERE slot_name = ?",
statement -> statement.setString(1, slotName),
rs -> {
if (rs.next()) {
return rs.getBoolean("active");
}
return false;
})) {
final String errorMessage = "Slot name \"" + slotName
+ "\" already exists and is active. Choose a unique name or stop the other process occupying the slot.";
logger.error(errorMessage);
slotNameResult.addErrorMessage(errorMessage);
}
}
catch (SQLException e) {
logger.info("Failed testing connection for {} with user '{}'", connection.connectionString(),
connection.username());
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
logger.error("Failed testing connection for {} with user '{}': {}", connection.connectionString(),
connection.username(), e.getLocalizedMessage());
hostnameResult.addErrorMessage("Error while validating connector config: " + e.getLocalizedMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,15 @@ public void initConnection() throws SQLException, InterruptedException {

@Override
public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
// note that some of these options are only supported in pg94+, additionally
// the options are not yet exported by the jdbc api wrapper, therefore, we just do this ourselves
// but eventually this should be moved back to the jdbc API
// note that some of these options are only supported in Postgres 9.4+, additionally
// the options are not yet exported by the jdbc api wrapper, therefore, we just do
// this ourselves but eventually this should be moved back to the jdbc API
// see https://github.com/pgjdbc/pgjdbc/issues/1305

LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
String tempPart = "";
// Exported snapshots are supported in PostgreSQL 9.4+
Boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
// Exported snapshots are supported in Postgres 9.4+
boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
if ((dropSlotOnClose || exportSnapshot) && !canExportSnapshot) {
LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, " +
"but not on a supported version of Postgres, ignoring!");
Expand All @@ -350,8 +350,8 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException
plugin.getPostgresPluginName());
LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand);
// when we are in pg94+, we can parse the slot creation info, otherwise, it returns
// nothing
// when we are in Postgres 9.4+, we can parse the slot creation info,
// otherwise, it returns nothing
if (canExportSnapshot) {
this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
}
Expand All @@ -375,11 +375,11 @@ private SlotCreationResult parseSlotCreation(ResultSet rs) {
return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
}
else {
throw new ConnectException("expected response to create_replication_slot");
throw new ConnectException("No replication slot found");
}
}
catch (SQLException ex) {
throw new ConnectException("unable to parse create_replication_slot", ex);
throw new ConnectException("Unable to parse create_replication_slot response", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,10 +31,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -69,6 +73,7 @@
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
Expand Down Expand Up @@ -152,6 +157,54 @@ public void shouldValidateMinimalConfiguration() throws Exception {
configValue.errorMessages().isEmpty()));
}

@Test
public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception {
// Start with a clean slate and create database objects
TestHelper.dropAllSchemas();
TestHelper.dropPublication();
TestHelper.dropDefaultReplicationSlot();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute("CREATE USER badboy WITH PASSWORD 'failing';", "GRANT ALL PRIVILEGES ON DATABASE postgres TO badboy;");

Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
.build();

start(PostgresConnector.class, config);
waitForStreamingRunning();

Configuration failingConfig = TestHelper.defaultConfig()
.with("name", "failingPGConnector")
.with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER, "badboy")
.with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PASSWORD, "failing")
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
.build();
List<ConfigValue> validatedConfig = new PostgresConnector().validate(failingConfig.asMap()).configValues();
Map<String, ConfigValue> results = validatedConfig.stream().collect(Collectors.toMap(ConfigValue::name, o -> o));

assertFalse("Expected error on \"slot.name\" property did not happen!", results.get("slot.name").errorMessages().isEmpty());
assertFalse("Expected error on \"database.user\" property did not happen!", results.get("database.user").errorMessages().isEmpty());
assertEquals(
"Postgres roles LOGIN and REPLICATION are not assigned to user: badboy",
results.get("database.user").errorMessages().get(0));
assertEquals(
"Slot name \"" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME
+ "\" already exists and is active. Choose a unique name or stop the other process occupying the slot.",
results.get("slot.name").errorMessages().get(0));

final List<String> invalidProperties = Arrays.asList("database.user", "slot.name");
validatedConfig.forEach(
configValue -> {
if (!invalidProperties.contains(configValue.name())) {
assertTrue("Unexpected error for \"" + configValue.name() + "\": " + configValue.errorMessages(), configValue.errorMessages().isEmpty());
}
});

stopConnector();
}

@Test
public void shouldValidateConfiguration() throws Exception {
// use an empty configuration which should be invalid because of the lack of DB connection details
Expand Down

0 comments on commit c59e00c

Please sign in to comment.