From 824b35f3f281d3e8136b2fc204684d3c83ef9bc1 Mon Sep 17 00:00:00 2001 From: "Palanisamy, Prakash" Date: Fri, 8 Mar 2024 21:26:35 +0530 Subject: [PATCH] chore(engine): add debug logs to improve job acquisition job execution logging * add test cases for the job-acquisition and job-execution logs related https://github.com/camunda/camunda-bpm-platform/issues/2666 --- .../jobexecutor/SpringJobExecutor.java | 11 +- .../jmx/services/JmxManagedThreadPool.java | 4 + .../engine/impl/jobexecutor/JobExecutor.java | 40 +++- .../impl/jobexecutor/JobExecutorLogger.java | 36 ++++ .../RuntimeContainerJobExecutor.java | 10 + .../SequentialJobAcquisitionRunnable.java | 3 + .../jobexecutor/ThreadPoolJobExecutor.java | 6 + .../test/jobexecutor/DelayDelegate.java | 29 +++ .../JobAcquisitionLoggingTest.java | 99 +++++++++ .../jobexecutor/JobExecutionLoggingTest.java | 195 ++++++++++++++++++ .../SimpleAsyncDelayProcess.bpmn20.xml | 19 ++ 11 files changed, 446 insertions(+), 6 deletions(-) create mode 100644 engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/DelayDelegate.java create mode 100644 engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobAcquisitionLoggingTest.java create mode 100644 engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobExecutionLoggingTest.java create mode 100644 engine/src/test/resources/org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml diff --git a/engine-spring/core/src/main/java/org/camunda/bpm/engine/spring/components/jobexecutor/SpringJobExecutor.java b/engine-spring/core/src/main/java/org/camunda/bpm/engine/spring/components/jobexecutor/SpringJobExecutor.java index a9911604208..f4a18643f0f 100644 --- a/engine-spring/core/src/main/java/org/camunda/bpm/engine/spring/components/jobexecutor/SpringJobExecutor.java +++ b/engine-spring/core/src/main/java/org/camunda/bpm/engine/spring/components/jobexecutor/SpringJobExecutor.java @@ -22,6 +22,7 @@ import org.camunda.bpm.engine.impl.ProcessEngineImpl; import org.camunda.bpm.engine.impl.jobexecutor.JobExecutor; import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @@ -58,10 +59,16 @@ public void executeJobs(List jobIds, ProcessEngineImpl processEngine) { try { taskExecutor.execute(getExecuteJobsRunnable(jobIds, processEngine)); } catch (RejectedExecutionException e) { - logRejectedExecution(processEngine, jobIds.size()); rejectedJobsHandler.jobsRejected(jobIds, processEngine, this); - } + } finally { + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + logJobExecutionInfo(processEngine, ((ThreadPoolTaskExecutor) taskExecutor).getQueueSize(), + ((ThreadPoolTaskExecutor) taskExecutor).getQueueCapacity(), + ((ThreadPoolTaskExecutor) taskExecutor).getMaxPoolSize(), + ((ThreadPoolTaskExecutor) taskExecutor).getActiveCount()); + } + } } @Override diff --git a/engine/src/main/java/org/camunda/bpm/container/impl/jmx/services/JmxManagedThreadPool.java b/engine/src/main/java/org/camunda/bpm/container/impl/jmx/services/JmxManagedThreadPool.java index 60145a51340..449810a30b3 100644 --- a/engine/src/main/java/org/camunda/bpm/container/impl/jmx/services/JmxManagedThreadPool.java +++ b/engine/src/main/java/org/camunda/bpm/container/impl/jmx/services/JmxManagedThreadPool.java @@ -112,6 +112,10 @@ public int getQueueCount() { return threadPoolQueue.size(); } + public int getQueueAddlCapacity() { + return threadPoolQueue.remainingCapacity(); + } + public ThreadPoolExecutor getThreadPoolExecutor() { return threadPoolExecutor; } diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutor.java b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutor.java index e116b314033..f281202faa8 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutor.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutor.java @@ -171,11 +171,43 @@ public void logAcquisitionFailureJobs(ProcessEngineImpl engine, int numJobs) { } public void logRejectedExecution(ProcessEngineImpl engine, int numJobs) { - if (engine != null && engine.getProcessEngineConfiguration().isMetricsEnabled()) { - engine.getProcessEngineConfiguration() - .getMetricsRegistry() - .markOccurrence(Metrics.JOB_EXECUTION_REJECTED, numJobs); + if (engine != null) { + LOG.rejectedJobExecutions(engine.getName(), numJobs); + if (engine.getProcessEngineConfiguration().isMetricsEnabled()) { + engine.getProcessEngineConfiguration() + .getMetricsRegistry() + .markOccurrence(Metrics.JOB_EXECUTION_REJECTED, numJobs); + } + } + } + + public void logJobExecutionInfo(ProcessEngineImpl engine, + int executionQueueSize, + int executionQueueCapacity, + int maxExecutionThreads, + int activeExecutionThreads) { + if (engine != null) { + LOG.currentJobExecutions(engine.getName(), activeExecutionThreads); + LOG.numJobsInQueue(engine.getName(), executionQueueSize, executionQueueCapacity); + try { + LOG.availableJobExecutionThreads(engine.getName(), + Math.subtractExact(maxExecutionThreads, activeExecutionThreads)); + } catch (ArithmeticException arithmeticException) { + //arithmetic exception occurred while computing remaining available thread count for logging. + LOG.availableThreadsCalculationError(); + } + } + } + + public int calculateTotalQueueCapacity(int availableItems, int remainingCapacity) { + int totalQueueCapacity = 0; + try { + totalQueueCapacity = Math.addExact(availableItems, remainingCapacity); + } catch (ArithmeticException arithmeticException) { + //arithmetic exception occurred while computing Total Queue Capacity for logging. + LOG.totalQueueCapacityCalculationError(); } + return totalQueueCapacity; } // getters and setters ////////////////////////////////////////////////////// diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutorLogger.java b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutorLogger.java index 44c49e96266..4aaaab8faae 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutorLogger.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/JobExecutorLogger.java @@ -217,4 +217,40 @@ public ProcessEngineException jobExecutorPriorityRangeException(String reason) { return new ProcessEngineException(exceptionMessage("031", "Invalid configuration for job executor priority range. Reason: {}", reason)); } + public void failedAcquisitionLocks(String processEngine, AcquiredJobs acquiredJobs) { + logDebug("033", "Jobs failed to Lock during Acquisition of jobs for the process engine '{}' : {}", processEngine, + acquiredJobs.getNumberOfJobsFailedToLock()); + } + + public void jobsToAcquire(String processEngine, int numJobsToAcquire) { + logDebug("034", "Attempting to acquire {} jobs for the process engine '{}'", numJobsToAcquire, processEngine); + } + + public void rejectedJobExecutions(String processEngine, int numJobsRejected) { + logDebug("035", "Jobs execution rejections for the process engine '{}' : {}", processEngine, numJobsRejected); + } + + public void availableJobExecutionThreads(String processEngine, int numAvailableThreads) { + logDebug("036", "Available job execution threads for the process engine '{}' : {}", processEngine, + numAvailableThreads); + } + + public void currentJobExecutions(String processEngine, int numExecutions) { + logDebug("037", "Jobs currently in execution for the process engine '{}' : {}", processEngine, numExecutions); + } + + public void numJobsInQueue(String processEngine, int numJobsInQueue, int maxQueueSize) { + logDebug("038", + "Jobs currently in queue to be executed for the process engine '{}' : {} (out of the max queue size : {})", + processEngine, numJobsInQueue, maxQueueSize); + } + + public void availableThreadsCalculationError() { + logDebug("039", "Arithmetic exception occurred while computing remaining available thread count for logging."); + } + + public void totalQueueCapacityCalculationError() { + logDebug("040", "Arithmetic exception occurred while computing total queue capacity for logging."); + } + } diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/RuntimeContainerJobExecutor.java b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/RuntimeContainerJobExecutor.java index 337a488853c..d9693f209e8 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/RuntimeContainerJobExecutor.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/RuntimeContainerJobExecutor.java @@ -20,6 +20,7 @@ import org.camunda.bpm.container.ExecutorService; import org.camunda.bpm.container.RuntimeContainerDelegate; +import org.camunda.bpm.container.impl.jmx.services.JmxManagedThreadPool; import org.camunda.bpm.engine.ProcessEngineException; import org.camunda.bpm.engine.impl.ProcessEngineImpl; @@ -60,6 +61,15 @@ public void executeJobs(List jobIds, ProcessEngineImpl processEngine) { logRejectedExecution(processEngine, jobIds.size()); rejectedJobsHandler.jobsRejected(jobIds, processEngine, this); } + + if (executorService instanceof JmxManagedThreadPool) { + int totalQueueCapacity = calculateTotalQueueCapacity(((JmxManagedThreadPool) executorService).getQueueCount(), + ((JmxManagedThreadPool) executorService).getQueueAddlCapacity()); + + logJobExecutionInfo(processEngine, ((JmxManagedThreadPool) executorService).getQueueCount(), totalQueueCapacity, + ((JmxManagedThreadPool) executorService).getMaximumPoolSize(), + ((JmxManagedThreadPool) executorService).getActiveCount()); + } } protected RuntimeContainerDelegate getRuntimeContainerDelegate() { diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/SequentialJobAcquisitionRunnable.java b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/SequentialJobAcquisitionRunnable.java index cb3831e4a0a..408764d660b 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/SequentialJobAcquisitionRunnable.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/SequentialJobAcquisitionRunnable.java @@ -157,6 +157,8 @@ protected AcquiredJobs acquireJobs( int numJobsToAcquire = acquisitionStrategy.getNumJobsToAcquire(currentProcessEngine.getName()); + LOG.jobsToAcquire(currentProcessEngine.getName(), numJobsToAcquire); + AcquiredJobs acquiredJobs = null; if (numJobsToAcquire > 0) { @@ -173,6 +175,7 @@ protected AcquiredJobs acquireJobs( jobExecutor.logAcquisitionFailureJobs(currentProcessEngine, acquiredJobs.getNumberOfJobsFailedToLock()); LOG.acquiredJobs(currentProcessEngine.getName(), acquiredJobs); + LOG.failedAcquisitionLocks(currentProcessEngine.getName(), acquiredJobs); return acquiredJobs; } diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/ThreadPoolJobExecutor.java b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/ThreadPoolJobExecutor.java index 39deae159ff..068778a965b 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/ThreadPoolJobExecutor.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/jobexecutor/ThreadPoolJobExecutor.java @@ -47,6 +47,12 @@ public void executeJobs(List jobIds, ProcessEngineImpl processEngine) { logRejectedExecution(processEngine, jobIds.size()); rejectedJobsHandler.jobsRejected(jobIds, processEngine, this); + } finally { + int totalQueueCapacity = calculateTotalQueueCapacity(threadPoolExecutor.getQueue().size(), + threadPoolExecutor.getQueue().remainingCapacity()); + + logJobExecutionInfo(processEngine, threadPoolExecutor.getQueue().size(), totalQueueCapacity, + threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getPoolSize()); } } diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/DelayDelegate.java b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/DelayDelegate.java new file mode 100644 index 00000000000..811b282a32f --- /dev/null +++ b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/DelayDelegate.java @@ -0,0 +1,29 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.engine.test.jobexecutor; + +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; + +public class DelayDelegate implements JavaDelegate { + + @Override + public void execute(DelegateExecution execution) throws Exception { + Thread.sleep(2000L); + } + +} diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobAcquisitionLoggingTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobAcquisitionLoggingTest.java new file mode 100644 index 00000000000..84b5cd673d0 --- /dev/null +++ b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobAcquisitionLoggingTest.java @@ -0,0 +1,99 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.engine.test.jobexecutor; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.camunda.bpm.engine.test.Deployment; +import org.camunda.bpm.engine.test.util.ProcessEngineTestRule; +import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule; +import org.camunda.commons.testing.ProcessEngineLoggingRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +public class JobAcquisitionLoggingTest { + + protected ProvidedProcessEngineRule engineRule = new ProvidedProcessEngineRule(); + public ProcessEngineTestRule testRule = new ProcessEngineTestRule(engineRule); + public ProcessEngineLoggingRule loggingRule = new ProcessEngineLoggingRule().watch( + "org.camunda.bpm.engine.jobexecutor", Level.DEBUG); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule(engineRule).around(testRule).around(loggingRule); + + protected RuntimeService runtimeService; + protected ProcessEngineConfigurationImpl processEngineConfiguration; + + @Before + public void init() { + runtimeService = engineRule.getRuntimeService(); + processEngineConfiguration = engineRule.getProcessEngineConfiguration(); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml" }) + public void shouldLogJobsAttemptingToAcquire() { + // Given three jobs + for (int i = 0; i < 3; i++) { + runtimeService.startProcessInstanceByKey("simpleAsyncProcess"); + } + + // When executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for log where it states that "acquiring [set value of MaxJobPerAcquisition] jobs" + List filteredLogList = loggingRule.getFilteredLog( + "Attempting to acquire " + processEngineConfiguration.getJobExecutor().getMaxJobsPerAcquisition() + + " jobs for the process engine '" + processEngineConfiguration.getProcessEngineName() + "'"); + + // asserting for a minimum occurrence as acquisition cycle should have started + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml" }) + public void shouldLogFailedAcquisitionLocks() { + // Given three jobs + for (int i = 0; i < 3; i++) { + runtimeService.startProcessInstanceByKey("simpleAsyncProcess"); + } + + // when executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for acquisition lock failures in logs. The logs should appear irrelevant of lock failure count of zero or + // more. + List filteredLogList = loggingRule.getFilteredLog( + "Jobs failed to Lock during Acquisition of jobs for the process engine '" + + processEngineConfiguration.getProcessEngineName() + "' : "); + + // Then observe the log appearing minimum 1 time, considering minimum 1 acquisition cycle + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1); + } +} diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobExecutionLoggingTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobExecutionLoggingTest.java new file mode 100644 index 00000000000..484fc342f2a --- /dev/null +++ b/engine/src/test/java/org/camunda/bpm/engine/test/jobexecutor/JobExecutionLoggingTest.java @@ -0,0 +1,195 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.engine.test.jobexecutor; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.camunda.bpm.engine.impl.jobexecutor.CallerRunsRejectedJobsHandler; +import org.camunda.bpm.engine.impl.jobexecutor.DefaultJobExecutor; +import org.camunda.bpm.engine.test.Deployment; +import org.camunda.bpm.engine.test.util.ProcessEngineTestRule; +import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule; +import org.camunda.commons.testing.ProcessEngineLoggingRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class JobExecutionLoggingTest { + + protected ProvidedProcessEngineRule engineRule = new ProvidedProcessEngineRule(); + public ProcessEngineTestRule testRule = new ProcessEngineTestRule(engineRule); + public ProcessEngineLoggingRule loggingRule = new ProcessEngineLoggingRule().watch( + "org.camunda.bpm.engine.jobexecutor", Level.DEBUG); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule(engineRule).around(testRule).around(loggingRule); + + protected RuntimeService runtimeService; + protected ProcessEngineConfigurationImpl processEngineConfiguration; + + @Before + public void init() { + runtimeService = engineRule.getRuntimeService(); + processEngineConfiguration = engineRule.getProcessEngineConfiguration(); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml" }) + public void shouldLogJobsQueuedForExecution() { + // Replace job executor with one that has custom threadpool executor settings + JobExecutionLoggingTest.TestJobExecutor testJobExecutor = new JobExecutionLoggingTest.TestJobExecutor(); + testJobExecutor.setMaxJobsPerAcquisition(10); + processEngineConfiguration.setJobExecutor(testJobExecutor); + testJobExecutor.registerProcessEngine(processEngineConfiguration.getProcessEngine()); + + // Given five jobs + for (int i = 0; i < 5; i++) { + runtimeService.startProcessInstanceByKey("simpleAsyncDelayProcess"); + } + + // When executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(7000L); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for filled queue logs + List filteredLogList = loggingRule.getFilteredLog("Jobs currently in queue to be executed for the " + + "process engine '" + processEngineConfiguration.getProcessEngineName() + "' : 2 (out of the max queue size " + + ": " + testJobExecutor.queueSize + ")"); + + // Then minimum one instance of filled queue log will be available. + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml" }) + public void shouldLogJobsInExecution() { + // Replace job executor with one that has custom threadpool executor settings + JobExecutionLoggingTest.TestJobExecutor testJobExecutor = new JobExecutionLoggingTest.TestJobExecutor(); + processEngineConfiguration.setJobExecutor(testJobExecutor); + testJobExecutor.registerProcessEngine(processEngineConfiguration.getProcessEngine()); + + // Given one job + runtimeService.startProcessInstanceByKey("simpleAsyncDelayProcess"); + + // When executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(5000L); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for jobs in execution + List filteredLogList = loggingRule.getFilteredLog( + "Jobs currently in execution for the process engine '" + processEngineConfiguration.getProcessEngineName() + + "' : 1"); + + // Since the execution will be happening for a while the check is made for more than one occurrence of the log. + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml" }) + public void shouldLogAvailableJobExecutionThreads() { + // Replace job executor with one that has custom threadpool executor settings + JobExecutionLoggingTest.TestJobExecutor testJobExecutor = new JobExecutionLoggingTest.TestJobExecutor(); + processEngineConfiguration.setJobExecutor(testJobExecutor); + testJobExecutor.registerProcessEngine(processEngineConfiguration.getProcessEngine()); + + // Given one job + runtimeService.startProcessInstanceByKey("simpleAsyncDelayProcess"); + + // When executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(5000L); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for available job execution threads logs + List filteredLogList = loggingRule.getFilteredLog( + "Available job execution threads for the process engine '" + processEngineConfiguration.getProcessEngineName() + + "' : 2"); + + // Since the execution will be happening for a while the check is made for more than one occurrence of the log. + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1); + } + + @Test + @Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/delegateThrowsException.bpmn20.xml" }) + public void shouldLogJobExecutionRejections() { + // Given three jobs + for (int i = 0; i < 3; i++) { + runtimeService.startProcessInstanceByKey("testProcess"); + } + + // Replace job executor with one that rejects all jobs + JobExecutionLoggingTest.RejectionJobExecutor rejectionExecutor = new JobExecutionLoggingTest.RejectionJobExecutor(); + processEngineConfiguration.setJobExecutor(rejectionExecutor); + rejectionExecutor.registerProcessEngine(processEngineConfiguration.getProcessEngine()); + + // When executing the jobs + processEngineConfiguration.getJobExecutor().start(); + testRule.waitForJobExecutorToProcessAllJobs(5000L); + processEngineConfiguration.getJobExecutor().shutdown(); + + // Look for job execution rejection job count with one job + List filteredLogList = loggingRule.getFilteredLog( + "Jobs execution rejections for the process engine '" + processEngineConfiguration.getProcessEngineName() + + "' : "); + + // Minimum occurrences of the log is three + assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(3); + } + + public static class TestJobExecutor extends DefaultJobExecutor { + + protected int queueSize = 2; + protected int corePoolSize = 1; + protected int maxPoolSize = 3; + protected BlockingQueue threadPoolQueue; + + public TestJobExecutor() { + threadPoolQueue = new ArrayBlockingQueue<>(queueSize); + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, + threadPoolQueue); + threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + } + } + + public static class RejectionJobExecutor extends TestJobExecutor { + public RejectionJobExecutor() { + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, + threadPoolQueue) { + @Override + public void execute(Runnable command) { + throw new RejectedExecutionException(); + } + }; + rejectedJobsHandler = new CallerRunsRejectedJobsHandler(); + } + } + +} \ No newline at end of file diff --git a/engine/src/test/resources/org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml b/engine/src/test/resources/org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml new file mode 100644 index 00000000000..63db2478b7e --- /dev/null +++ b/engine/src/test/resources/org/camunda/bpm/engine/test/jobexecutor/SimpleAsyncDelayProcess.bpmn20.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + +