Using the Uni or ManagedExecutor or other async calls does not release DB connection at end of transaction #34959
Description
Describe the bug
I have a Quartz job and an endpoint that calls a service with the goal of running a store procedure, but I want this done asynchronous. For this I started by creating a Uni that has to make a call to the repository that will then execute the store procedure. After Completing the transaction of the worker, in the database pool, the connection it was being used by the worker stays in the active state and does not goes into idle.
If I the code is executed multiple times, it leaves with multiple connections in the active state equal to the number of times the code is executed.
Since I have a max pool of 20 (default value), after I have this max as active connections the pod goes into a state that cannot acquire new connections and then I have to restart it.
I am using for database Postgresql version 13.11
Initial Code:
@ApplicationScoped
@Transactional(Transactional.TxType.REQUIRED)
public class EntityCommandServiceImpl implements EntityCommandService {
private static final Logger LOGGER = Logger.getLogger(EntityCommandServiceImpl.class);
private static final String ENTITY_METRIC_NAME = "entity";
private static final String ENTITY_METRIC_DESCRIPTION =
"Metrics regarding the process of fetching the entity.";
@Inject
EntityRepository entityRepository;
@Override
@Timed(value = ENTITY_METRIC_NAME, description = ENTITY_METRIC_DESCRIPTION)
public void fetchEntity() {
final UUID itemId = UUID.randomUUID();
LOGGER.infof("Starting to fetch entity (processingId=%s).", itemId);
Uni.createFrom()
.item(itemId)
.emitOn(Infrastructure.getDefaultWorkerPool())
.subscribe()
.with(this::worker);
}
private Uni<Void> worker(final UUID uuid) {
LOGGER.infof("Starting worker for fetching entity (processingId=%s).", uuid.toString());
try {
this.entityRepository.fetchEntity();
} catch (final StoredProcedureException exception) {
LOGGER.infof("Could not fetch entity (processingId=%s).", uuid.toString());
}
LOGGER.infof("Finished worker for fetching entity (processingId=%s).", uuid.toString());
return Uni.createFrom().voidItem();
}
}
After I identified the issue was here, I tried this next solution that gave the same issue:
@ApplicationScoped
@Transactional(Transactional.TxType.REQUIRED)
public class EntityCommandServiceImpl implements EntityCommandService {
private static final Logger LOGGER = Logger.getLogger(EntityCommandServiceImpl.class);
private static final String ENTITY_METRIC_NAME = "entity";
private static final String ENTITY_METRIC_DESCRIPTION =
"Metrics regarding the process of fetching the entity.";
@Inject
EntityRepository entityRepository;
@Override
@Timed(value = ENTITY_METRIC_NAME, description = ENTITY_METRIC_DESCRIPTION)
public void fetchEntity() {
final UUID itemId = UUID.randomUUID();
LOGGER.infof("Starting to fetch entity (processingId=%s).", itemId);
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
executor.runAsync(threadContext.contextualRunnable(() -> this.worker(itemId)));
}
private void worker(final UUID uuid) {
LOGGER.infof("Starting worker for fetching entity (processingId=%s).", uuid.toString());
try {
this.entityRepository.fetchEntity();
} catch (final StoredProcedureException exception) {
LOGGER.infof("Could not fetch entity (processingId=%s).", uuid.toString());
}
LOGGER.infof("Finished worker for fetching entity (processingId=%s).", uuid.toString());
}
}
Repository:
@ApplicationScoped
@Transactional(Transactional.TxType.SUPPORTS)
public class EntityCustomRepositoryImpl implements EntityCustomRepository {
private static final Logger LOGGER = Logger.getLogger(EntityCustomRepositoryImpl.class);
private static final String FN_FETCH_ENTITY = "fn_fetch_entity";
private static final String CONNECTION_STRING_PARAMETER = "dblink";
private static final String RESULT_PARAMETER = "result";
/**
* host for connection.
*/
@ConfigProperty(name = "host")
String host;
/**
* port for connection.
*/
@ConfigProperty(name = "port")
int port;
/**
* database name for connection.
*/
@ConfigProperty(name = "name")
String name;
/**
* user name for connection.
*/
@ConfigProperty(name = "user")
String user;
/**
* password for connection.
*/
@ConfigProperty(name = "password")
String password;
/**
* The entity manager for the repository
*/
@PersistenceContext
EntityManager entityManager;
@Override
public void fetchEntity() throws StoredProcedureException {
final FunctionResultDTO result = executeFetchEntityFunction();
this.validateResultFetchEntity(result);
LOGGER.info("Fetching entity executed successfully");
}
private FunctionResultDTO executeFetchEntityFunction() {
final DBConnectionString connectionString = DBConnectionString.builder()
.dbName(this.name)
.host(this.host)
.port(this.port)
.user(this.user)
.password(this.password)
.build();
final StoredProcedureQuery function = this.entityManager.createStoredProcedureQuery(FN_FETCH_ENTITY);
function.registerStoredProcedureParameter(CONNECTION_STRING_PARAMETER, String.class, ParameterMode.IN);
function.registerStoredProcedureParameter(RESULT_PARAMETER, String.class, ParameterMode.OUT);
function.setParameter(CONNECTION_STRING_PARAMETER, connectionString.toString());
function.execute();
return this.jsonParserUtil.parse((String) function.getOutputParameterValue(RESULT_PARAMETER), FunctionResultDTO.class);
}
private void validateResultFetchEntity(final FunctionResultDTO functionResultDTO) throws StoredProcedureException {
if (functionResultDTO == null) {
throw new StoredProcedureException("Stored procedure returned null");
}
final int errorResult = 1;
if (functionResultDTO.getExecutionStatus() == errorResult) {
if (LOGGER.isEnabled(Logger.Level.ERROR)) {
LOGGER.errorf("Function for fetching entity executed with error. "
+ "Output: {executionStatus: %d, errorName: %s})", functionResultDTO.getExecutionStatus(),
functionResultDTO.getErrorName());
}
throw new StoredProcedureException(functionResultDTO.getErrorName());
}
}
}
Note that I also tried without the ThreadContext
After all this and with no solution that I could find, had to change it back to a synchronous solution.
I tried to set the max lifetime of the database connection but it didn't kill the active connections only the idle ones.
quarkus.datasource.jdbc.max-lifetime= 5M
Expected behavior
I expect that at the end of the transaction the connection to the database goes back into idle state.
Actual behavior
The database transaction stays in active state forever
How to Reproduce?
No response
Output of uname -a
or ver
No response
Output of java -version
17.0.2
GraalVM version (if different from Java)
No response
Quarkus version or git rev
2.9.2.Final
Build tool (ie. output of mvnw --version
or gradlew --version
)
maven 3.8.3
Additional information
No response
Activity