-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add ProcessErrorListener for AsyncItemProcessor to allow notification of async processing failures #366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ProcessErrorListener for AsyncItemProcessor to allow notification of async processing failures #366
Conversation
… of async processing failures BATCH-2371
A couple thoughts about this PR:
Thoughts? |
I fully agree! |
Using the existing interface shouldn't add a circular dependency (we already depend on the |
@mminella I had a go on this, let me know what you think |
@mminella anything I can do on this one? |
@mminella can I do anything to get this going? |
The issue related to this PR (https://jira.spring.io/browse/BATCH-2371) has been resolved with a documentation update. |
@benas I think it is still valid. Yes it is documented, but it basically just says its not working the way you would expect it to work and this PR (or the idea behind it) will fix this. |
If I understand correctly, the goal is to listen to processing errors happening in the import java.util.Arrays;
import java.util.List;
import java.util.concurrent.FutureTask;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class MyJob {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public MyJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
@Bean
public AsyncItemProcessor itemProcessor() {
AsyncItemProcessor<Integer, Integer> itemProcessor = new AsyncItemProcessor<>();
itemProcessor.setDelegate(item -> {
if (Thread.currentThread().getName().equals("processor-thread-3")) {
throw new ItemProcessingException("boom!", item);
}
System.out.println(Thread.currentThread().getName() + ": processing item = " + item);
return item * 2;
});
itemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor("processor-thread-"));
return itemProcessor;
}
@Bean
public AsyncItemWriter itemWriter() {
AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(items -> {
for (Integer item : items) {
System.out.println("item = " + item);
}
});
return asyncItemWriter;
}
@Bean
public Step step() {
return steps.get("step")
.<Integer, Integer>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.listener((ItemWriteListener) new ItemProcessingExceptionHandler())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
class ItemProcessingException extends Exception {
private Object item;
ItemProcessingException(String message, Object item) {
super(message);
this.item = item;
}
public Object getItem() {
return item;
}
}
class ItemProcessingExceptionHandler extends ItemListenerSupport<Integer, FutureTask<Integer>> {
@Override
public void onWriteError(Exception exception, List<? extends FutureTask<Integer>> items) {
ItemProcessingException itemProcessingException = (ItemProcessingException) exception; // TODO exception type check
System.err.println(String.format("Error wile processing item %s due to '%s'",
itemProcessingException.getItem(),
itemProcessingException.getMessage()));
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
} In this example, I created a custom exception to encapsulate the item in error. The @imod Does this make sense? BTW, there would a clever way to achieve the requirement using a @Bean
public AsyncItemProcessor itemProcessor() {
AsyncItemProcessor<Integer, Integer> itemProcessor = new AsyncItemProcessor<>();
itemProcessor.setDelegate(delegate());
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); // or any other implementation of TaskExecutor
itemProcessor.setTaskExecutor(new ErrorHandlingTaskExecutor(taskExecutor, new ItemProcessingErrorHandler()));
return itemProcessor;
} Unfortunately, this is not possible because the |
@benas this looks good, thanks - would be nice to have this in the official documentation |
@imod ok thank you for your feedback
Sure! I think this can be added in the Asynchronous Processors section. If you want, you are welcome to update this PR with documentation changes. Otherwise, I will create another JIRA ticket (other than BATCH-2371) to update the docs. |
I created a JIRA ticket BATCH-2773 to update the documentation with the details of how to handle aynchronous processing errors as explained in the example above. I'm closing this PR. |
@benas sorry for my late reply - I will try to find time for it |
As mention in BATCH-2371, ItemProcessListener.onProcessError() is not called in parallel processing. This is quite a big drawback, as there is no fine grained way to react on processing errors anymore.
This PR adds a new Interface
ProcessErrorListener
which can be set onAsyncItemProcessor
. If it is set, thenProcessErrorListener
is called in case of an exception.