Skip to content

Using the Uni or ManagedExecutor or other async calls does not release DB connection at end of transaction #34959

Open
@psycokilll

Description

Describe the bug

I have a Quartz job and an endpoint that calls a service with the goal of running a store procedure, but I want this done asynchronous. For this I started by creating a Uni that has to make a call to the repository that will then execute the store procedure. After Completing the transaction of the worker, in the database pool, the connection it was being used by the worker stays in the active state and does not goes into idle.
If I the code is executed multiple times, it leaves with multiple connections in the active state equal to the number of times the code is executed.
Since I have a max pool of 20 (default value), after I have this max as active connections the pod goes into a state that cannot acquire new connections and then I have to restart it.

I am using for database Postgresql version 13.11

Initial Code:

@ApplicationScoped
@Transactional(Transactional.TxType.REQUIRED)
public class EntityCommandServiceImpl implements EntityCommandService {

    private static final Logger LOGGER = Logger.getLogger(EntityCommandServiceImpl.class);
    private static final String ENTITY_METRIC_NAME = "entity";
    private static final String ENTITY_METRIC_DESCRIPTION =
            "Metrics regarding the process of fetching the entity.";

    @Inject
    EntityRepository entityRepository;

    @Override
    @Timed(value = ENTITY_METRIC_NAME, description = ENTITY_METRIC_DESCRIPTION)
    public void fetchEntity() {
        final UUID itemId = UUID.randomUUID();
        LOGGER.infof("Starting to fetch entity (processingId=%s).", itemId);

        Uni.createFrom()
                .item(itemId)
                .emitOn(Infrastructure.getDefaultWorkerPool())
                .subscribe()
                .with(this::worker);
    }

    private Uni<Void> worker(final UUID uuid) {
        LOGGER.infof("Starting worker for fetching entity (processingId=%s).", uuid.toString());

        try {
            this.entityRepository.fetchEntity();
        } catch (final StoredProcedureException exception) {
            LOGGER.infof("Could not fetch entity (processingId=%s).", uuid.toString());
        }

        LOGGER.infof("Finished worker for fetching entity (processingId=%s).", uuid.toString());
        return Uni.createFrom().voidItem();
    }
}

After I identified the issue was here, I tried this next solution that gave the same issue:

@ApplicationScoped
@Transactional(Transactional.TxType.REQUIRED)
public class EntityCommandServiceImpl implements EntityCommandService {

    private static final Logger LOGGER = Logger.getLogger(EntityCommandServiceImpl.class);
    private static final String ENTITY_METRIC_NAME = "entity";
    private static final String ENTITY_METRIC_DESCRIPTION =
            "Metrics regarding the process of fetching the entity.";

    @Inject
    EntityRepository entityRepository;

    @Override
    @Timed(value = ENTITY_METRIC_NAME, description = ENTITY_METRIC_DESCRIPTION)
    public void fetchEntity() {
        final UUID itemId = UUID.randomUUID();
        LOGGER.infof("Starting to fetch entity (processingId=%s).", itemId);

        ManagedExecutor executor = ManagedExecutor.builder()
                .maxAsync(5)
                .propagated(ThreadContext.CDI,
                            ThreadContext.TRANSACTION)
                .build();

        ThreadContext threadContext = ThreadContext.builder()
                .propagated(ThreadContext.CDI,
                            ThreadContext.TRANSACTION)
                .build();

        executor.runAsync(threadContext.contextualRunnable(() -> this.worker(itemId)));
    }

    private void worker(final UUID uuid) {
        LOGGER.infof("Starting worker for fetching entity (processingId=%s).", uuid.toString());

        try {
            this.entityRepository.fetchEntity();
        } catch (final StoredProcedureException exception) {
            LOGGER.infof("Could not fetch entity (processingId=%s).", uuid.toString());
        }

        LOGGER.infof("Finished worker for fetching entity (processingId=%s).", uuid.toString());
    }
}

Repository:

@ApplicationScoped
@Transactional(Transactional.TxType.SUPPORTS)
public class EntityCustomRepositoryImpl implements EntityCustomRepository {

    private static final Logger LOGGER = Logger.getLogger(EntityCustomRepositoryImpl.class);

    private static final String FN_FETCH_ENTITY = "fn_fetch_entity";
    private static final String CONNECTION_STRING_PARAMETER = "dblink";
    private static final String RESULT_PARAMETER = "result";

    /**
     * host for connection.
     */
    @ConfigProperty(name = "host")
    String host;

    /**
     * port for connection.
     */
    @ConfigProperty(name = "port")
    int port;

    /**
     * database name for connection.
     */
    @ConfigProperty(name = "name")
    String name;

    /**
     * user name for connection.
     */
    @ConfigProperty(name = "user")
    String user;

    /**
     * password for connection.
     */
    @ConfigProperty(name = "password")
    String password;

    /**
     * The entity manager for the repository
     */
    @PersistenceContext
    EntityManager entityManager;


    @Override
    public void fetchEntity() throws StoredProcedureException {
        final FunctionResultDTO result = executeFetchEntityFunction();
        this.validateResultFetchEntity(result);
        LOGGER.info("Fetching entity executed successfully");
    }

    private FunctionResultDTO executeFetchEntityFunction() {
        final DBConnectionString connectionString = DBConnectionString.builder()
                .dbName(this.name)
                .host(this.host)
                .port(this.port)
                .user(this.user)
                .password(this.password)
                .build();

        final StoredProcedureQuery function = this.entityManager.createStoredProcedureQuery(FN_FETCH_ENTITY);
        function.registerStoredProcedureParameter(CONNECTION_STRING_PARAMETER, String.class, ParameterMode.IN);
        function.registerStoredProcedureParameter(RESULT_PARAMETER, String.class, ParameterMode.OUT);
        function.setParameter(CONNECTION_STRING_PARAMETER, connectionString.toString());

        function.execute();

        return this.jsonParserUtil.parse((String) function.getOutputParameterValue(RESULT_PARAMETER), FunctionResultDTO.class);
    }

    private void validateResultFetchEntity(final FunctionResultDTO functionResultDTO) throws StoredProcedureException {
        if (functionResultDTO == null) {
            throw new StoredProcedureException("Stored procedure returned null");
        }

        final int errorResult = 1;
        if (functionResultDTO.getExecutionStatus() == errorResult) {
            if (LOGGER.isEnabled(Logger.Level.ERROR)) {
                LOGGER.errorf("Function for fetching entity executed with error. "
                              + "Output: {executionStatus: %d, errorName: %s})", functionResultDTO.getExecutionStatus(),
                              functionResultDTO.getErrorName());
            }

            throw new StoredProcedureException(functionResultDTO.getErrorName());
        }
    }
}

Note that I also tried without the ThreadContext

After all this and with no solution that I could find, had to change it back to a synchronous solution.

I tried to set the max lifetime of the database connection but it didn't kill the active connections only the idle ones.
quarkus.datasource.jdbc.max-lifetime= 5M

Expected behavior

I expect that at the end of the transaction the connection to the database goes back into idle state.

Actual behavior

The database transaction stays in active state forever

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

17.0.2

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.9.2.Final

Build tool (ie. output of mvnw --version or gradlew --version)

maven 3.8.3

Additional information

No response

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions