Closed
Description
Bug description
JobOperator#stop
can not stop JobExecution correctly in the case that the stop command is executed after step finish but before job finish. JobExecute will ends up with status STOPPING
instead of STOPPED
.
Since STOPPING
is a running state, so I think this stop command does not stop JobExecution correctly.
Steps to reproduce
- Build a tasklet or chunk step with an
ItemStream
being set usingAbstractTaskletStepBuilder#stream
. TheItemStream
should overrideclose
method. - Build a simple job with this step.
- Launch the job in one thread.
- Stop the job execution in another thread using
JobOperator#stop
. Make sure this method execute after step finish but before job finish. (You can refer to the method in below example to do this)
Expected behavior
JobExecution ends up with status STOPPED
.
Actual behavior
JobExecution ends up with status STOPPING
.
Minimal Complete Reproducible example
This following example can be run under src/test/java/org/springframework/batch/test
in spring-batch-test module.
package org.springframework.batch.test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.sql.DataSource;
import org.junit.Test;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import static org.junit.Assert.assertEquals;
public class StopOperatorTests {
static JobOperator jobOperator;
static CountDownLatch startStopLatch = new CountDownLatch(1);
static CountDownLatch finishStopLatch = new CountDownLatch(1);
/**
* Use two signal to simulate the case that the stop command executed after step finish but before job finish.
*
* @throws Exception
*/
@Test
public void testStop() throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(StopJobConfiguration.class);
JobLauncherTestUtils testUtils = context.getBean(JobLauncherTestUtils.class);
jobOperator = context.getBean(JobOperator.class);
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Launch job in one thread
Future<JobExecution> future = executorService.submit(() -> {
try {
return testUtils.launchJob();
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
// Stop this job execution in another thread
executorService.submit(() -> {
try {
// Wait startSop signal
startStopLatch.await();
jobOperator.stop(0L);
// Send finishStop signal
finishStopLatch.countDown();
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
// Expected: this job execution can be stopped
assertEquals(BatchStatus.STOPPED, future.get().getStatus());
}
@Configuration
@EnableBatchProcessing
static class StopJobConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry());
return jobRegistryBeanPostProcessor;
}
@Bean
public JobOperator jobOperator(JobLauncher jobLauncher, JobRepository jobRepository,
JobExplorer jobExplorer, JobRegistry jobRegistry) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobRepository(jobRepository);
return jobOperator;
}
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
.stream(new MockStream())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job").start(step()).build();
}
@Bean
public JobLauncherTestUtils testUtils() {
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
jobLauncherTestUtils.setJob(job());
return jobLauncherTestUtils;
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.generateUniqueName(true)
.build();
}
}
static class MockStream extends ItemStreamSupport {
@Override
public void close() {
try {
// Send startStop signal
startStopLatch.countDown();
// Wait finishStop signal
finishStopLatch.await();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Or refer to this link to see the source code.
Run this test should generate following output.
java.lang.AssertionError:
Expected :STOPPED
Actual :STOPPING