Skip to content

Commit

Permalink
GH-8680: Check DB for table on start (#8690)
Browse files Browse the repository at this point in the history
* GH-8680: Check DB for table on start

Fixes #8680

If database is not initialized properly before application start, we may lose messages
at runtime when we fail to insert data into DB

* Implement `SmartLifecycle` on `JdbcMessageStore`, `JdbcChannelMessageStore`,
`JdbcMetadataStore`, and `DefaultLockRepository` to perform `SELECT COUNT` query in `start()`
to fail fast if no required table is present.
* Refactor `AbstractJdbcChannelMessageStoreTests` into JUnit 5 and use `MySqlContainerTest`
for more coverage
* Fix newly failed tests which had DB not initialized before
* Exclude `commons-logging` from `commons-dbcp2` dependency to avoid
classpath conflict
* Document the new feature

* * Fix HTTP URL in the `DataSource-mysql-context.xml`

* Fix language in docs

Co-authored-by: Gary Russell <grussell@vmware.com>

* * Add `setCheckDatabaseOnStart(false)` to disable the check query for all the SI JDBC components

* Fix language in Javadocs

Co-authored-by: Gary Russell <grussell@vmware.com>

---------

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell authored Jul 28, 2023
1 parent 0c7d40d commit 7dcc0bb
Show file tree
Hide file tree
Showing 25 changed files with 414 additions and 104 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,9 @@ project('spring-integration-jdbc') {
testImplementation "org.apache.derby:derbyclient:$derbyVersion"
testImplementation "org.postgresql:postgresql:$postgresVersion"
testImplementation "mysql:mysql-connector-java:$mysqlVersion"
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
testImplementation ("org.apache.commons:commons-dbcp2:$commonsDbcp2Version") {
exclude group: 'commons-logging'
}
testImplementation 'org.testcontainers:mysql'
testImplementation 'org.testcontainers:postgresql'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.sql.DataSource;

Expand All @@ -29,6 +30,8 @@
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
Expand All @@ -45,6 +48,12 @@
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
* for local synchronizations.
* <p>
* This class implements {@link SmartLifecycle} and calls
* {@code SELECT COUNT(REGION) FROM %sLOCK} query
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Dave Syer
* @author Artem Bilan
Expand All @@ -56,7 +65,10 @@
* @since 4.3
*/
public class DefaultLockRepository
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton,
SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(DefaultLockRepository.class);

/**
* Default value for the table prefix property.
Expand All @@ -72,6 +84,8 @@ public class DefaultLockRepository

private final JdbcTemplate template;

private final AtomicBoolean started = new AtomicBoolean();

private Duration ttl = DEFAULT_TTL;

private String prefix = DEFAULT_TABLE_PREFIX;
Expand Down Expand Up @@ -116,6 +130,10 @@ SELECT COUNT(REGION)
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
""";

private String countAllQuery = """
SELECT COUNT(REGION) FROM %sLOCK
""";

private ApplicationContext applicationContext;

private PlatformTransactionManager transactionManager;
Expand All @@ -126,6 +144,8 @@ SELECT COUNT(REGION)

private TransactionTemplate serializableTransactionTemplate;

private boolean checkDatabaseOnStart = true;

/**
* Constructor that initializes the client id that will be associated for
* all the locks persisted by the store instance to a random {@link UUID}.
Expand Down Expand Up @@ -293,6 +313,7 @@ public void afterPropertiesSet() {
this.insertQuery = String.format(this.insertQuery, this.prefix);
this.countQuery = String.format(this.countQuery, this.prefix);
this.renewQuery = String.format(this.renewQuery, this.prefix);
this.countAllQuery = String.format(this.countAllQuery, this.prefix);
}

@Override
Expand Down Expand Up @@ -325,6 +346,41 @@ public void afterSingletonsInstantiated() {
this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
this.template.queryForObject(this.countAllQuery, Integer.class); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

@Override
public void close() {
this.defaultTransactionTemplate.executeWithoutResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.springframework.integration.jdbc.metadata;

import java.util.concurrent.atomic.AtomicBoolean;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
Expand All @@ -34,14 +38,22 @@
* where <code>*</code> is the target database type.
* <p>
* The transaction management is required to use this {@link ConcurrentMetadataStore}.
* <p>
* This class implements {@link SmartLifecycle} and calls
* {@code SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE} query
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Bojan Vukasovic
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*/
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean {
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(JdbcMetadataStore.class);

private static final String KEY_CANNOT_BE_NULL = "'key' cannot be null";

Expand All @@ -52,6 +64,8 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB

private final JdbcOperations jdbcTemplate;

private final AtomicBoolean started = new AtomicBoolean();

private String tablePrefix = DEFAULT_TABLE_PREFIX;

private String region = "DEFAULT";
Expand Down Expand Up @@ -93,6 +107,12 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB
HAVING COUNT(*)=0
""";

private String countQuery = """
SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE
""";

private boolean checkDatabaseOnStart = true;

/**
* Instantiate a {@link JdbcMetadataStore} using provided dataSource {@link DataSource}.
* @param dataSource a {@link DataSource}
Expand Down Expand Up @@ -137,7 +157,7 @@ public void setRegion(String region) {
* Specify a row lock hint for the query in the lock-based operations.
* Defaults to {@code FOR UPDATE}. Can be specified as an empty string,
* if the target RDBMS doesn't support locking on tables from queries.
* The value depends from RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
* The value depends on the RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
* @param lockHint the RDBMS vendor-specific lock hint.
* @since 5.0.7
*/
Expand All @@ -154,6 +174,42 @@ public void afterPropertiesSet() {
this.replaceValueByKeyQuery = String.format(this.replaceValueByKeyQuery, this.tablePrefix);
this.removeValueQuery = String.format(this.removeValueQuery, this.tablePrefix);
this.putIfAbsentValueQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix, this.tablePrefix);
this.countQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix);
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
this.jdbcTemplate.queryForObject(this.countQuery, Integer.class); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

@Override
Expand All @@ -162,7 +218,7 @@ public String putIfAbsent(String key, String value) {
Assert.notNull(key, KEY_CANNOT_BE_NULL);
Assert.notNull(value, "'value' cannot be null");
while (true) {
//try to insert if does not exists
//try to insert if the entry does not exist
int affectedRows = tryToPutIfAbsent(key, value);
if (affectedRows > 0) {
//it was not in the table, so we have just inserted
Expand Down Expand Up @@ -218,7 +274,7 @@ public void put(String key, String value) {
Assert.notNull(key, KEY_CANNOT_BE_NULL);
Assert.notNull(value, "'value' cannot be null");
while (true) {
//try to insert if does not exist, if exists we will try to update it
//try to insert if the entry does not exist, if it exists we will try to update it
int affectedRows = tryToPutIfAbsent(key, value);
if (affectedRows == 0) {
//since value is not inserted, means it is already present
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -31,6 +32,7 @@
import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.log.LogMessage;
import org.springframework.core.serializer.Deserializer;
Expand Down Expand Up @@ -73,6 +75,11 @@
* The SQL scripts for creating the table are packaged
* under {@code org/springframework/integration/jdbc/schema-*.sql},
* where {@code *} denotes the target database type.
* <p>
* This class implements {@link SmartLifecycle} and calls {@link #getMessageGroupCount()}
* on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Gunnar Hillert
* @author Artem Bilan
Expand All @@ -83,7 +90,7 @@
* @since 2.2
*/
@ManagedResource
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean {
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(JdbcChannelMessageStore.class);

Expand Down Expand Up @@ -121,6 +128,8 @@ private enum Query {

private final Lock idCacheWriteLock = this.idCacheLock.writeLock();

private final AtomicBoolean started = new AtomicBoolean();

private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider;

private String region = DEFAULT_REGION;
Expand All @@ -145,6 +154,8 @@ private enum Query {

private boolean priorityEnabled;

private boolean checkDatabaseOnStart = true;

/**
* Convenient constructor for configuration use.
*/
Expand Down Expand Up @@ -411,6 +422,41 @@ public void afterPropertiesSet() {
this.jdbcTemplate.afterPropertiesSet();
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
getMessageGroupCount(); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

/**
* Store a message in the database. The groupId identifies the channel for which
* the message is to be stored.
Expand Down
Loading

0 comments on commit 7dcc0bb

Please sign in to comment.