Skip to content

GH-8680: Check DB for table on start #8690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,9 @@ project('spring-integration-jdbc') {
testImplementation "org.apache.derby:derbyclient:$derbyVersion"
testImplementation "org.postgresql:postgresql:$postgresVersion"
testImplementation "mysql:mysql-connector-java:$mysqlVersion"
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
testImplementation ("org.apache.commons:commons-dbcp2:$commonsDbcp2Version") {
exclude group: 'commons-logging'
}
testImplementation 'org.testcontainers:mysql'
testImplementation 'org.testcontainers:postgresql'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.sql.DataSource;

Expand All @@ -29,6 +30,8 @@
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
Expand All @@ -45,6 +48,12 @@
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
* for local synchronizations.
* <p>
* This class implements {@link SmartLifecycle} and calls
* {@code SELECT COUNT(REGION) FROM %sLOCK} query
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Dave Syer
* @author Artem Bilan
Expand All @@ -56,7 +65,10 @@
* @since 4.3
*/
public class DefaultLockRepository
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton,
SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(DefaultLockRepository.class);

/**
* Default value for the table prefix property.
Expand All @@ -72,6 +84,8 @@ public class DefaultLockRepository

private final JdbcTemplate template;

private final AtomicBoolean started = new AtomicBoolean();

private Duration ttl = DEFAULT_TTL;

private String prefix = DEFAULT_TABLE_PREFIX;
Expand Down Expand Up @@ -116,6 +130,10 @@ SELECT COUNT(REGION)
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
""";

private String countAllQuery = """
SELECT COUNT(REGION) FROM %sLOCK
""";

private ApplicationContext applicationContext;

private PlatformTransactionManager transactionManager;
Expand All @@ -126,6 +144,8 @@ SELECT COUNT(REGION)

private TransactionTemplate serializableTransactionTemplate;

private boolean checkDatabaseOnStart = true;

/**
* Constructor that initializes the client id that will be associated for
* all the locks persisted by the store instance to a random {@link UUID}.
Expand Down Expand Up @@ -293,6 +313,7 @@ public void afterPropertiesSet() {
this.insertQuery = String.format(this.insertQuery, this.prefix);
this.countQuery = String.format(this.countQuery, this.prefix);
this.renewQuery = String.format(this.renewQuery, this.prefix);
this.countAllQuery = String.format(this.countAllQuery, this.prefix);
}

@Override
Expand Down Expand Up @@ -325,6 +346,41 @@ public void afterSingletonsInstantiated() {
this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
this.template.queryForObject(this.countAllQuery, Integer.class); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

@Override
public void close() {
this.defaultTransactionTemplate.executeWithoutResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.springframework.integration.jdbc.metadata;

import java.util.concurrent.atomic.AtomicBoolean;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
Expand All @@ -34,14 +38,22 @@
* where <code>*</code> is the target database type.
* <p>
* The transaction management is required to use this {@link ConcurrentMetadataStore}.
* <p>
* This class implements {@link SmartLifecycle} and calls
* {@code SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE} query
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Bojan Vukasovic
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*/
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean {
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(JdbcMetadataStore.class);

private static final String KEY_CANNOT_BE_NULL = "'key' cannot be null";

Expand All @@ -52,6 +64,8 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB

private final JdbcOperations jdbcTemplate;

private final AtomicBoolean started = new AtomicBoolean();

private String tablePrefix = DEFAULT_TABLE_PREFIX;

private String region = "DEFAULT";
Expand Down Expand Up @@ -93,6 +107,12 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB
HAVING COUNT(*)=0
""";

private String countQuery = """
SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE
""";

private boolean checkDatabaseOnStart = true;

/**
* Instantiate a {@link JdbcMetadataStore} using provided dataSource {@link DataSource}.
* @param dataSource a {@link DataSource}
Expand Down Expand Up @@ -137,7 +157,7 @@ public void setRegion(String region) {
* Specify a row lock hint for the query in the lock-based operations.
* Defaults to {@code FOR UPDATE}. Can be specified as an empty string,
* if the target RDBMS doesn't support locking on tables from queries.
* The value depends from RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
* The value depends on the RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
* @param lockHint the RDBMS vendor-specific lock hint.
* @since 5.0.7
*/
Expand All @@ -154,6 +174,42 @@ public void afterPropertiesSet() {
this.replaceValueByKeyQuery = String.format(this.replaceValueByKeyQuery, this.tablePrefix);
this.removeValueQuery = String.format(this.removeValueQuery, this.tablePrefix);
this.putIfAbsentValueQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix, this.tablePrefix);
this.countQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix);
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
this.jdbcTemplate.queryForObject(this.countQuery, Integer.class); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

@Override
Expand All @@ -162,7 +218,7 @@ public String putIfAbsent(String key, String value) {
Assert.notNull(key, KEY_CANNOT_BE_NULL);
Assert.notNull(value, "'value' cannot be null");
while (true) {
//try to insert if does not exists
//try to insert if the entry does not exist
int affectedRows = tryToPutIfAbsent(key, value);
if (affectedRows > 0) {
//it was not in the table, so we have just inserted
Expand Down Expand Up @@ -218,7 +274,7 @@ public void put(String key, String value) {
Assert.notNull(key, KEY_CANNOT_BE_NULL);
Assert.notNull(value, "'value' cannot be null");
while (true) {
//try to insert if does not exist, if exists we will try to update it
//try to insert if the entry does not exist, if it exists we will try to update it
int affectedRows = tryToPutIfAbsent(key, value);
if (affectedRows == 0) {
//since value is not inserted, means it is already present
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -31,6 +32,7 @@
import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.log.LogMessage;
import org.springframework.core.serializer.Deserializer;
Expand Down Expand Up @@ -73,6 +75,11 @@
* The SQL scripts for creating the table are packaged
* under {@code org/springframework/integration/jdbc/schema-*.sql},
* where {@code *} denotes the target database type.
* <p>
* This class implements {@link SmartLifecycle} and calls {@link #getMessageGroupCount()}
* on {@link #start()} to check if required table is present in DB.
* The application context will fail to start if the table is not present.
* This check can be disabled via {@link #setCheckDatabaseOnStart(boolean)}.
*
* @author Gunnar Hillert
* @author Artem Bilan
Expand All @@ -83,7 +90,7 @@
* @since 2.2
*/
@ManagedResource
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean {
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {

private static final LogAccessor LOGGER = new LogAccessor(JdbcChannelMessageStore.class);

Expand Down Expand Up @@ -121,6 +128,8 @@ private enum Query {

private final Lock idCacheWriteLock = this.idCacheLock.writeLock();

private final AtomicBoolean started = new AtomicBoolean();

private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider;

private String region = DEFAULT_REGION;
Expand All @@ -145,6 +154,8 @@ private enum Query {

private boolean priorityEnabled;

private boolean checkDatabaseOnStart = true;

/**
* Convenient constructor for configuration use.
*/
Expand Down Expand Up @@ -411,6 +422,41 @@ public void afterPropertiesSet() {
this.jdbcTemplate.afterPropertiesSet();
}

/**
* The flag to perform a database check query on start or not.
* @param checkDatabaseOnStart false to not perform the database check.
* @since 6.2
*/
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) {
this.checkDatabaseOnStart = checkDatabaseOnStart;
if (!checkDatabaseOnStart) {
LOGGER.info("The 'DefaultLockRepository' won't be started automatically " +
"and required table is not going be checked.");
}
}

@Override
public boolean isAutoStartup() {
return this.checkDatabaseOnStart;
}

@Override
public void start() {
if (this.started.compareAndSet(false, true) && this.checkDatabaseOnStart) {
getMessageGroupCount(); // If no table in DB, an exception is thrown
}
}

@Override
public void stop() {
this.started.set(false);
}

@Override
public boolean isRunning() {
return this.started.get();
}

/**
* Store a message in the database. The groupId identifies the channel for which
* the message is to be stored.
Expand Down
Loading