Skip to content

Add ProcessErrorListener for AsyncItemProcessor to allow notification of async processing failures #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

imod
Copy link

@imod imod commented May 19, 2015

As mention in BATCH-2371, ItemProcessListener.onProcessError() is not called in parallel processing. This is quite a big drawback, as there is no fine grained way to react on processing errors anymore.

This PR adds a new Interface ProcessErrorListener which can be set on AsyncItemProcessor. If it is set, then ProcessErrorListener is called in case of an exception.

@mminella
Copy link
Member

mminella commented Jun 8, 2015

A couple thoughts about this PR:

  1. I'd rather use the normal ItemProcessListener for this instead of a new interface that essentially duplicates the functionality.
  2. I'm thinking the framework should be able to detect that the AsyncItemProcessor is in use and inject the listener instance into the wrapper instead of requiring the user to wire that up (just like we don't require a user to wire their listeners into the various other components that are used behind the scenes for processing a step).

Thoughts?

@imod
Copy link
Author

imod commented Jun 9, 2015

I fully agree!
The reason why I did not reuse the normal ItemProcessListener, was my fear of circular dependencies (although I did not really check this in more details).

@mminella
Copy link
Member

mminella commented Jun 9, 2015

Using the existing interface shouldn't add a circular dependency (we already depend on the ItemProcessor interface). I'd be more worried about a circular dependency or package tangle with regards to how 2 is implemented...

@imod
Copy link
Author

imod commented Jun 11, 2015

@mminella I had a go on this, let me know what you think

@imod
Copy link
Author

imod commented Aug 18, 2015

@mminella anything I can do on this one?

@imod
Copy link
Author

imod commented Oct 5, 2015

@mminella can I do anything to get this going?

@fmbenhassine
Copy link
Contributor

The issue related to this PR (https://jira.spring.io/browse/BATCH-2371) has been resolved with a documentation update.

@imod @mminella Is this PR still relevant?

@imod
Copy link
Author

imod commented Oct 8, 2018

@benas I think it is still valid. Yes it is documented, but it basically just says its not working the way you would expect it to work and this PR (or the idea behind it) will fix this.

@fmbenhassine
Copy link
Contributor

fmbenhassine commented Nov 8, 2018

this PR (or the idea behind it) will fix this.

If I understand correctly, the goal is to listen to processing errors happening in the AsyncItemProcessor. This is actually possible without the need for a new listener API. Since the AsyncItemProcessor submits a FutureTask to the task executor, the only way to know if an exception happened in the task is by unwrapping the future (the exception will be wrapped in a java.util.concurrent.ExecutionException when the FutureTask.get is called). Now since the future is unwrapped in the AsyncItemWriter, we can use an ItemWriteListener and react to processing errors. Here is a quick example:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.FutureTask;

import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class MyJob {

	private final JobBuilderFactory jobs;

	private final StepBuilderFactory steps;

	public MyJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
		this.jobs = jobs;
		this.steps = steps;
	}

	@Bean
	public ItemReader<Integer> itemReader() {
		return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
	}

	@Bean
	public AsyncItemProcessor itemProcessor() {
		AsyncItemProcessor<Integer, Integer> itemProcessor = new AsyncItemProcessor<>();
		itemProcessor.setDelegate(item -> {
			if (Thread.currentThread().getName().equals("processor-thread-3")) {
				throw new ItemProcessingException("boom!", item);
			}
			System.out.println(Thread.currentThread().getName() + ": processing item = " + item);
			return item * 2;
		});
		itemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor("processor-thread-"));
		return itemProcessor;
	}

	@Bean
	public AsyncItemWriter itemWriter() {
		AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
		asyncItemWriter.setDelegate(items -> {
			for (Integer item : items) {
				System.out.println("item = " + item);
			}
		});
		return asyncItemWriter;
	}

	@Bean
	public Step step() {
		return steps.get("step")
				.<Integer, Integer>chunk(5)
				.reader(itemReader())
				.processor(itemProcessor())
				.writer(itemWriter())
				.listener((ItemWriteListener) new ItemProcessingExceptionHandler())
				.build();
	}

	@Bean
	public Job job() {
		return jobs.get("job")
				.start(step())
				.build();
	}

	class ItemProcessingException extends Exception {
		private Object item;

		ItemProcessingException(String message, Object item) {
			super(message);
			this.item = item;
		}

		public Object getItem() {
			return item;
		}
	}

	class ItemProcessingExceptionHandler extends ItemListenerSupport<Integer, FutureTask<Integer>> {
		@Override
		public void onWriteError(Exception exception, List<? extends FutureTask<Integer>> items) {
			ItemProcessingException itemProcessingException = (ItemProcessingException) exception; // TODO exception type check
			System.err.println(String.format("Error wile processing item %s due to '%s'",
					itemProcessingException.getItem(),
					itemProcessingException.getMessage()));
		}
	}

	public static void main(String[] args) throws Exception {
		ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
		JobLauncher jobLauncher = context.getBean(JobLauncher.class);
		Job job = context.getBean(Job.class);
		jobLauncher.run(job, new JobParameters());
	}

}

In this example, I created a custom exception to encapsulate the item in error. The ItemProcessingExceptionHandler would be the place where to plug the custom code to execute when an error occurs while processing items asynchronously.

@imod Does this make sense?

BTW, there would a clever way to achieve the requirement using a ErrorHandlingTaskExecutor to intercept processing exceptions thrown in the tasks submitted by the AsyncItemProcessor, something like:

@Bean
public AsyncItemProcessor itemProcessor() {
	AsyncItemProcessor<Integer, Integer> itemProcessor = new AsyncItemProcessor<>();
	itemProcessor.setDelegate(delegate());
	SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); // or any other implementation of TaskExecutor
	itemProcessor.setTaskExecutor(new ErrorHandlingTaskExecutor(taskExecutor, new ItemProcessingErrorHandler()));
	return itemProcessor;
}

Unfortunately, this is not possible because the FutureTask class does not throw exceptions when executed, it stores the exception and then wraps it in ExecutionException and throw it to the caller when get is called.. If this was not the case, using a ErrorHandlingTaskExecutor with the AsyncItemProcessor would be an elegant way to achieve the requirement IMO.

@imod
Copy link
Author

imod commented Nov 9, 2018

@benas this looks good, thanks - would be nice to have this in the official documentation

@fmbenhassine
Copy link
Contributor

@imod ok thank you for your feedback

would be nice to have this in the official documentation

Sure! I think this can be added in the Asynchronous Processors section. If you want, you are welcome to update this PR with documentation changes. Otherwise, I will create another JIRA ticket (other than BATCH-2371) to update the docs.

@fmbenhassine
Copy link
Contributor

I created a JIRA ticket BATCH-2773 to update the documentation with the details of how to handle aynchronous processing errors as explained in the example above. I'm closing this PR.

@imod
Copy link
Author

imod commented Nov 13, 2018

@benas sorry for my late reply - I will try to find time for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants