Skip to content

Commit

Permalink
chore(engine): add debug logs to improve job acquisition job executio…
Browse files Browse the repository at this point in the history
…n logging

* add test cases for the job-acquisition and job-execution logs

related camunda#2666
  • Loading branch information
prakashpalanisamy authored and yanavasileva committed Dec 2, 2024
1 parent 536442d commit 824b35f
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.camunda.bpm.engine.impl.ProcessEngineImpl;
import org.camunda.bpm.engine.impl.jobexecutor.JobExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
*
Expand Down Expand Up @@ -58,10 +59,16 @@ public void executeJobs(List<String> jobIds, ProcessEngineImpl processEngine) {
try {
taskExecutor.execute(getExecuteJobsRunnable(jobIds, processEngine));
} catch (RejectedExecutionException e) {

logRejectedExecution(processEngine, jobIds.size());
rejectedJobsHandler.jobsRejected(jobIds, processEngine, this);
}
} finally {
if (taskExecutor instanceof ThreadPoolTaskExecutor) {
logJobExecutionInfo(processEngine, ((ThreadPoolTaskExecutor) taskExecutor).getQueueSize(),
((ThreadPoolTaskExecutor) taskExecutor).getQueueCapacity(),
((ThreadPoolTaskExecutor) taskExecutor).getMaxPoolSize(),
((ThreadPoolTaskExecutor) taskExecutor).getActiveCount());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public int getQueueCount() {
return threadPoolQueue.size();
}

public int getQueueAddlCapacity() {
return threadPoolQueue.remainingCapacity();
}

public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,43 @@ public void logAcquisitionFailureJobs(ProcessEngineImpl engine, int numJobs) {
}

public void logRejectedExecution(ProcessEngineImpl engine, int numJobs) {
if (engine != null && engine.getProcessEngineConfiguration().isMetricsEnabled()) {
engine.getProcessEngineConfiguration()
.getMetricsRegistry()
.markOccurrence(Metrics.JOB_EXECUTION_REJECTED, numJobs);
if (engine != null) {
LOG.rejectedJobExecutions(engine.getName(), numJobs);
if (engine.getProcessEngineConfiguration().isMetricsEnabled()) {
engine.getProcessEngineConfiguration()
.getMetricsRegistry()
.markOccurrence(Metrics.JOB_EXECUTION_REJECTED, numJobs);
}
}
}

public void logJobExecutionInfo(ProcessEngineImpl engine,
int executionQueueSize,
int executionQueueCapacity,
int maxExecutionThreads,
int activeExecutionThreads) {
if (engine != null) {
LOG.currentJobExecutions(engine.getName(), activeExecutionThreads);
LOG.numJobsInQueue(engine.getName(), executionQueueSize, executionQueueCapacity);
try {
LOG.availableJobExecutionThreads(engine.getName(),
Math.subtractExact(maxExecutionThreads, activeExecutionThreads));
} catch (ArithmeticException arithmeticException) {
//arithmetic exception occurred while computing remaining available thread count for logging.
LOG.availableThreadsCalculationError();
}
}
}

public int calculateTotalQueueCapacity(int availableItems, int remainingCapacity) {
int totalQueueCapacity = 0;
try {
totalQueueCapacity = Math.addExact(availableItems, remainingCapacity);
} catch (ArithmeticException arithmeticException) {
//arithmetic exception occurred while computing Total Queue Capacity for logging.
LOG.totalQueueCapacityCalculationError();
}
return totalQueueCapacity;
}

// getters and setters //////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,40 @@ public ProcessEngineException jobExecutorPriorityRangeException(String reason) {
return new ProcessEngineException(exceptionMessage("031", "Invalid configuration for job executor priority range. Reason: {}", reason));
}

public void failedAcquisitionLocks(String processEngine, AcquiredJobs acquiredJobs) {
logDebug("033", "Jobs failed to Lock during Acquisition of jobs for the process engine '{}' : {}", processEngine,
acquiredJobs.getNumberOfJobsFailedToLock());
}

public void jobsToAcquire(String processEngine, int numJobsToAcquire) {
logDebug("034", "Attempting to acquire {} jobs for the process engine '{}'", numJobsToAcquire, processEngine);
}

public void rejectedJobExecutions(String processEngine, int numJobsRejected) {
logDebug("035", "Jobs execution rejections for the process engine '{}' : {}", processEngine, numJobsRejected);
}

public void availableJobExecutionThreads(String processEngine, int numAvailableThreads) {
logDebug("036", "Available job execution threads for the process engine '{}' : {}", processEngine,
numAvailableThreads);
}

public void currentJobExecutions(String processEngine, int numExecutions) {
logDebug("037", "Jobs currently in execution for the process engine '{}' : {}", processEngine, numExecutions);
}

public void numJobsInQueue(String processEngine, int numJobsInQueue, int maxQueueSize) {
logDebug("038",
"Jobs currently in queue to be executed for the process engine '{}' : {} (out of the max queue size : {})",
processEngine, numJobsInQueue, maxQueueSize);
}

public void availableThreadsCalculationError() {
logDebug("039", "Arithmetic exception occurred while computing remaining available thread count for logging.");
}

public void totalQueueCapacityCalculationError() {
logDebug("040", "Arithmetic exception occurred while computing total queue capacity for logging.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.camunda.bpm.container.ExecutorService;
import org.camunda.bpm.container.RuntimeContainerDelegate;
import org.camunda.bpm.container.impl.jmx.services.JmxManagedThreadPool;
import org.camunda.bpm.engine.ProcessEngineException;
import org.camunda.bpm.engine.impl.ProcessEngineImpl;

Expand Down Expand Up @@ -60,6 +61,15 @@ public void executeJobs(List<String> jobIds, ProcessEngineImpl processEngine) {
logRejectedExecution(processEngine, jobIds.size());
rejectedJobsHandler.jobsRejected(jobIds, processEngine, this);
}

if (executorService instanceof JmxManagedThreadPool) {
int totalQueueCapacity = calculateTotalQueueCapacity(((JmxManagedThreadPool) executorService).getQueueCount(),
((JmxManagedThreadPool) executorService).getQueueAddlCapacity());

logJobExecutionInfo(processEngine, ((JmxManagedThreadPool) executorService).getQueueCount(), totalQueueCapacity,
((JmxManagedThreadPool) executorService).getMaximumPoolSize(),
((JmxManagedThreadPool) executorService).getActiveCount());
}
}

protected RuntimeContainerDelegate getRuntimeContainerDelegate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ protected AcquiredJobs acquireJobs(

int numJobsToAcquire = acquisitionStrategy.getNumJobsToAcquire(currentProcessEngine.getName());

LOG.jobsToAcquire(currentProcessEngine.getName(), numJobsToAcquire);

AcquiredJobs acquiredJobs = null;

if (numJobsToAcquire > 0) {
Expand All @@ -173,6 +175,7 @@ protected AcquiredJobs acquireJobs(
jobExecutor.logAcquisitionFailureJobs(currentProcessEngine, acquiredJobs.getNumberOfJobsFailedToLock());

LOG.acquiredJobs(currentProcessEngine.getName(), acquiredJobs);
LOG.failedAcquisitionLocks(currentProcessEngine.getName(), acquiredJobs);

return acquiredJobs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void executeJobs(List<String> jobIds, ProcessEngineImpl processEngine) {
logRejectedExecution(processEngine, jobIds.size());
rejectedJobsHandler.jobsRejected(jobIds, processEngine, this);

} finally {
int totalQueueCapacity = calculateTotalQueueCapacity(threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getQueue().remainingCapacity());

logJobExecutionInfo(processEngine, threadPoolExecutor.getQueue().size(), totalQueueCapacity,
threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getPoolSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.engine.test.jobexecutor;

import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;

public class DelayDelegate implements JavaDelegate {

@Override
public void execute(DelegateExecution execution) throws Exception {
Thread.sleep(2000L);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.engine.test.jobexecutor;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;

public class JobAcquisitionLoggingTest {

protected ProvidedProcessEngineRule engineRule = new ProvidedProcessEngineRule();
public ProcessEngineTestRule testRule = new ProcessEngineTestRule(engineRule);
public ProcessEngineLoggingRule loggingRule = new ProcessEngineLoggingRule().watch(
"org.camunda.bpm.engine.jobexecutor", Level.DEBUG);

@Rule
public RuleChain ruleChain = RuleChain.outerRule(engineRule).around(testRule).around(loggingRule);

protected RuntimeService runtimeService;
protected ProcessEngineConfigurationImpl processEngineConfiguration;

@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
}

@Test
@Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml" })
public void shouldLogJobsAttemptingToAcquire() {
// Given three jobs
for (int i = 0; i < 3; i++) {
runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
}

// When executing the jobs
processEngineConfiguration.getJobExecutor().start();
testRule.waitForJobExecutorToProcessAllJobs();
processEngineConfiguration.getJobExecutor().shutdown();

// Look for log where it states that "acquiring [set value of MaxJobPerAcquisition] jobs"
List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog(
"Attempting to acquire " + processEngineConfiguration.getJobExecutor().getMaxJobsPerAcquisition()
+ " jobs for the process engine '" + processEngineConfiguration.getProcessEngineName() + "'");

// asserting for a minimum occurrence as acquisition cycle should have started
assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1);
}

@Test
@Deployment(resources = { "org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml" })
public void shouldLogFailedAcquisitionLocks() {
// Given three jobs
for (int i = 0; i < 3; i++) {
runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
}

// when executing the jobs
processEngineConfiguration.getJobExecutor().start();
testRule.waitForJobExecutorToProcessAllJobs();
processEngineConfiguration.getJobExecutor().shutdown();

// Look for acquisition lock failures in logs. The logs should appear irrelevant of lock failure count of zero or
// more.
List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog(
"Jobs failed to Lock during Acquisition of jobs for the process engine '"
+ processEngineConfiguration.getProcessEngineName() + "' : ");

// Then observe the log appearing minimum 1 time, considering minimum 1 acquisition cycle
assertThat(filteredLogList.size()).isGreaterThanOrEqualTo(1);
}
}
Loading

0 comments on commit 824b35f

Please sign in to comment.