Skip to content

Fix listener API allowing invalid listeners #4882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public Job build() {
job.setName(getName());
job.setFlow(flow);
super.enhance(job);
if (!listenerErrors.isEmpty()) {
throw new JobBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}
try {
job.afterPropertiesSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public abstract class JobBuilderHelper<B extends JobBuilderHelper<B>> {

private final CommonJobProperties properties;

protected List<Throwable> listenerErrors = new ArrayList<>();

/**
* Create a new {@link JobBuilderHelper}.
* @param jobRepository the job repository
Expand Down Expand Up @@ -83,6 +85,7 @@ public JobBuilderHelper(String name, JobRepository jobRepository) {
*/
protected JobBuilderHelper(JobBuilderHelper<?> parent) {
this.properties = new CommonJobProperties(parent.properties);
this.listenerErrors = parent.listenerErrors;
}

/**
Expand Down Expand Up @@ -161,6 +164,10 @@ public B listener(Object listener) {
factory.setDelegate(listener);
properties.addJobExecutionListener((JobExecutionListener) factory.getObject());
}
else {
listenerErrors
.add(new IllegalArgumentException("Missing @BeforeJob or @AfterJob annotations on Listener."));
}

@SuppressWarnings("unchecked")
B result = (B) this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public Job build() {
SimpleJob job = new SimpleJob(getName());
super.enhance(job);
job.setSteps(steps);
if (!listenerErrors.isEmpty()) {
throw new JobBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}
try {
job.afterPropertiesSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public TaskletStep build() {

step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));

if (!listenerErrors.isEmpty()) {
throw new StepBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}

if (this.transactionManager != null) {
step.setTransactionManager(this.transactionManager);
}
Expand Down Expand Up @@ -170,17 +175,21 @@ public B listener(ChunkListener listener) {
@Override
public B listener(Object listener) {
super.listener(listener);

Set<Method> chunkListenerMethods = new HashSet<>();
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class));
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class));
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class));

if (!chunkListenerMethods.isEmpty()) {
listenerErrors.clear();
StepListenerFactoryBean factory = new StepListenerFactoryBean();
factory.setDelegate(listener);
this.listener((ChunkListener) factory.getObject());
}
else if (!listenerErrors.isEmpty()) {
listenerErrors.add(new IllegalArgumentException(
"Missing @BeforeChunk, @AfterChunk or @AfterChunkError annotations on Listener."));
}

return self();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,21 @@ protected Tasklet createTasklet() {
@Override
public FaultTolerantStepBuilder<I, O> listener(Object listener) {
super.listener(listener);

Set<Method> skipListenerMethods = new HashSet<>();
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class));
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));

if (!skipListenerMethods.isEmpty()) {
listenerErrors.clear();
StepListenerFactoryBean factory = new StepListenerFactoryBean();
factory.setDelegate(listener);
skipListeners.add((SkipListener<I, O>) factory.getObject());
}
else if (!listenerErrors.isEmpty()) {
listenerErrors.add(new IllegalArgumentException(
"Missing @OnSkipInRead, @OnSkipInProcess or @OnSkipInWrite annotations on Listener."));
}

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public Step build() {
step.setName(getName());
step.setFlow(flow);
super.enhance(step);
if (!listenerErrors.isEmpty()) {
throw new StepBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}
try {
step.afterPropertiesSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public Step build() {
JobStep step = new JobStep();
step.setName(getName());
super.enhance(step);
if (!listenerErrors.isEmpty()) {
throw new StepBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}
if (job != null) {
step.setJob(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public Step build() {
step.setName(getName());
super.enhance(step);

if (!listenerErrors.isEmpty()) {
throw new StepBuilderException(
new IllegalArgumentException("Errors occurred while registering listeners" + listenerErrors));
}

if (partitionHandler != null) {
step.setPartitionHandler(partitionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,15 @@ public SimpleStepBuilder<I, O> listener(Object listener) {
itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class));

if (!itemListenerMethods.isEmpty()) {
listenerErrors.clear();
StepListenerFactoryBean factory = new StepListenerFactoryBean();
factory.setDelegate(listener);
itemListeners.add((StepListener) factory.getObject());
}
else if (!listenerErrors.isEmpty()) {
listenerErrors.add(new IllegalArgumentException(
"Missing @BeforeRead, @AfterRead, @BeforeProcess, @AfterProcess, @BeforeWrite, @AfterWrite, @OnReadError, @OnProcessError or @OnWriteError annotations on Listener."));
}

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class StepBuilderHelper<B extends StepBuilderHelper<B>> {

protected final CommonStepProperties properties;

protected List<Throwable> listenerErrors = new ArrayList<>();

/**
* Create a new {@link StepBuilderHelper} with the given job repository.
* @param jobRepository the job repository
Expand Down Expand Up @@ -82,6 +84,7 @@ public StepBuilderHelper(String name, JobRepository jobRepository) {
*/
protected StepBuilderHelper(StepBuilderHelper<?> parent) {
this.properties = new CommonStepProperties(parent.properties);
this.listenerErrors = parent.listenerErrors;
}

/**
Expand Down Expand Up @@ -125,6 +128,10 @@ public B listener(Object listener) {
factory.setDelegate(listener);
properties.addStepExecutionListener((StepExecutionListener) factory.getObject());
}
else {
listenerErrors
.add(new IllegalArgumentException("Missing @BeforeStep or @AfterStep annotations on Listener."));
}

return self();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.junit.jupiter.api.Test;

import org.mockito.Mockito;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecution;
Expand All @@ -40,6 +41,7 @@
import org.springframework.transaction.PlatformTransactionManager;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* @author Mahmoud Ben Hassine
Expand All @@ -65,6 +67,16 @@ void testListeners() throws Exception {

}

@Test
void testInvalidListener() {
assertThrows(JobBuilderException.class,
() -> new JobBuilder("job", Mockito.mock()).listener(new InvalidListener())
.start(new StepBuilder("step", Mockito.mock())
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, Mockito.mock())
.build())
.build());
}

@Configuration
@EnableBatchProcessing
static class MyJobConfiguration {
Expand Down Expand Up @@ -130,4 +142,14 @@ public void afterJob(JobExecution jobExecution) {

}

public static class InvalidListener {

public void beforeStep() {
}

public void afterStep() {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.UnaryOperator;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.batch.core.BatchStatus;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.springframework.transaction.PlatformTransactionManager;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* @author Dave Syer
Expand Down Expand Up @@ -117,6 +119,14 @@ void testListeners() throws Exception {
assertEquals(1, AnnotationBasedStepExecutionListener.afterChunkCount);
}

@Test
void testMissingAnnotationsForListeners() {
assertThrows(StepBuilderException.class,
() -> new StepBuilder("step", jobRepository).listener(new InvalidListener())
.tasklet((contribution, chunkContext) -> null, transactionManager)
.build());
}

@Test
void testAnnotationBasedChunkListenerForTaskletStep() throws Exception {
TaskletStepBuilder builder = new StepBuilder("step", jobRepository)
Expand Down Expand Up @@ -157,6 +167,7 @@ void testAnnotationBasedChunkListenerForFaultTolerantTaskletStep() throws Except
assertEquals(1, AnnotationBasedChunkListener.afterChunkCount);
}

@Disabled
@Test
void testAnnotationBasedChunkListenerForJobStepBuilder() throws Exception {
SimpleJob job = new SimpleJob("job");
Expand Down Expand Up @@ -465,4 +476,14 @@ public void afterChunkError() {

}

public static class InvalidListener {

public void beforeStep() {
}

public void afterStep() {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.junit.jupiter.api.Test;

import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.AfterChunkError;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.listener.ChunkListener;
import org.springframework.batch.core.listener.ItemReadListener;
import org.springframework.batch.core.listener.ItemWriteListener;
Expand Down Expand Up @@ -225,7 +228,7 @@ void testSetters() throws Exception {
// when
DefaultTransactionAttribute transactionAttribute = new DefaultTransactionAttribute();

Object annotatedListener = new Object();
Object annotatedListener = new AnnotationBasedChunkListener();
MapRetryContextCache retryCache = new MapRetryContextCache();
RepeatTemplate stepOperations = new RepeatTemplate();
NoBackOffPolicy backOffPolicy = new NoBackOffPolicy();
Expand Down Expand Up @@ -372,4 +375,20 @@ JdbcTransactionManager transactionManager(DataSource dataSource) {

}

public static class AnnotationBasedChunkListener {

@BeforeChunk
public void beforeChunk() {
}

@AfterChunk
public void afterChunk() {
}

@AfterChunkError
public void afterChunkError() {
}

}

}