Skip to content

A failing Item blocks processing of further item while using multi threaded step [BATCH-1832] #1757

Open
@spring-projects-issues

Description

@spring-projects-issues

Kamal Govindraj opened BATCH-1832 and commented

We have job configured that reads items from SQS queue and processes. We have setup a taskexecutor with a throttle limit to allow processing of multiple items in parallel. In a specific scenario the job stops picking up further items to process even though there are items in the queue and the number of threads in use is less than the throttle limit.

The following is the relevant snippet of configuration

<tasklet task-executor="taskExecutor" throttle-limit="20">

<chunk commit-interval="1" reader="testQueueReader" processor="testProcessor"
writer="dummyWriter"/>
</tasklet>

On further investigation we found that this problem occurs only when processing of one of the item fails with an exception. The main thread blocks till processing of another item finishes. In our case processing of one item can take more than 30+ minutes - for this duration all the other threads are idle.

Stack trace:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.springframework.batch.repeat.support.ResultHolderResultQueue.take(ResultHolderResultQueue.java:134)
org.springframework.batch.repeat.support.ResultHolderResultQueue.take(ResultHolderResultQueue.java:33)
org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate.getNextResult(TaskExecutorRepeatTemplate.java:143)
org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)

I have investigated it further - the problem is either in the ResultHolderResultQueue.isContinuable or TaskExecutorRepeatTemplate.ExecutingRunnable.run method. The ResultHolderResultQueue doesn't work correctly when you put a Result with an error. The following test case replicates the issue

@Test
public void testTakeBlocksForErrorResult() throws Exception {
// Submit two tasks
queue.expect();
queue.expect();

// second one takes a while to run
Future result = executorService.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
queue.put(new TestResultHolder(RepeatStatus.CONTINUABLE));
}
});

// first one finishes immediately with error
queue.put(new TestResultHolder(null,new Throwable()));
assertFalse(queue.isEmpty());

// take result on the first one should not wait for the second task to complete
assertNull(queue.take().getResult());

assertFalse("take blocks till the next task finishes",result.isDone());
}

The bug can be fixed by modifying the ResultHolderResultQueue.isContinuable as follows

private boolean isContinuable(ResultHolder value) {
return (value.getResult() != null && value.getResult().isContinuable()) || value.getError() != null;
}

or by setting the result in the catch block of TaskExecutorRepeatTemplate.ExecutingRunnable.run method

I have attached a patch with the test case and two fixes - only one is required. I think the TaskExecutorRepeatTemplate is the correct place to fix it - with a check in ResultHolderResultQueue to disallow null result.


Affects: 2.1.8

Attachments:

2 votes, 2 watchers

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions