Spring Batch - Simple Async & Polling
here's a use case a client came upon; they were going to use a Spring Batch Tasklet to call a service that would do some file processing. The call to the remote service, if synchronous, would cause a transaction timeout in the Spring Batch world namely because the file processing service would take so long (let's say 10 mins).
to solve this, we looked at the RepeatStatus of the Tasklet as a 'polling-type' solution. that is, we wouldn't allow the Tasklet to continue until we've gotten back an 'ok' from the service. The service was then launched as FutureTask which sorted that out.
so, in the name of preserving this solution, here's the example
first the service interface that we want to call from batch
then the implementation
note: the BasicService class inside it is the dummy service
now a very very simple task let (place holder really)
and here's where the 'polling' happens
note: there's a pollingInterval set that allows for the tasklet to 'pause' before trying again
now here's the glue part - the batch job definition itself
and the resources as well
finally, here's a unit test to make sure it works
and to help get the project going, the maven pom too
first the service interface that we want to call from batch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.incompletecode.spring.batch.service; | |
public interface ServiceAdapter { | |
public void startService(); | |
public boolean isComplete(); | |
public boolean hasStarted(); | |
} |
then the implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.incompletecode.spring.batch.service; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.FutureTask; | |
public class SimpleServiceAdapter implements ServiceAdapter { | |
private boolean started = false; | |
private FutureTask<BasicService> future; | |
private ExecutorService executor; | |
public void startService() { | |
future = new FutureTask<BasicService>(new Callable<BasicService>() { | |
public BasicService call() throws Exception { | |
BasicService basicService = new BasicService(); | |
//start | |
basicService.process(); | |
//return | |
return basicService; | |
} }); | |
//execute it | |
executor.execute(future); | |
//set the value | |
started = true; | |
} | |
public boolean isComplete() { | |
if (hasStarted()) { | |
return future.isDone(); | |
}//end if | |
return false;//default | |
} | |
public boolean hasStarted() { | |
return started; | |
} | |
public void setExecutor(ExecutorService executor) { | |
this.executor = executor; | |
} | |
class BasicService { | |
public void process() throws Exception { | |
//running in here | |
//wait for a while | |
Thread.sleep(60 * 1000);//1 minute | |
//done | |
} | |
} | |
} |
note: the BasicService class inside it is the dummy service
now a very very simple task let (place holder really)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.incompletecode.spring.batch.tasklet; | |
import org.springframework.batch.core.StepContribution; | |
import org.springframework.batch.core.scope.context.ChunkContext; | |
import org.springframework.batch.core.step.tasklet.Tasklet; | |
import org.springframework.batch.repeat.RepeatStatus; | |
public class BasicTasklet implements Tasklet { | |
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception { | |
//does something here | |
return RepeatStatus.FINISHED; | |
} | |
} |
and here's where the 'polling' happens
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.incompletecode.spring.batch.tasklet; | |
import org.incompletecode.spring.batch.service.ServiceAdapter; | |
import org.springframework.batch.core.StepContribution; | |
import org.springframework.batch.core.scope.context.ChunkContext; | |
import org.springframework.batch.core.step.tasklet.Tasklet; | |
import org.springframework.batch.repeat.RepeatStatus; | |
import org.springframework.beans.factory.annotation.Autowired; | |
public class PollingTasklet implements Tasklet { | |
@Autowired | |
private ServiceAdapter serviceAdapter; | |
@Autowired | |
private Long pollingPeriod; | |
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception { | |
// lets check the status | |
if (!serviceAdapter.hasStarted()) { | |
//start it | |
serviceAdapter.startService(); | |
//wait between seeing if it's done | |
Thread.sleep(pollingPeriod); | |
//return a state | |
return RepeatStatus.CONTINUABLE;//polling | |
} else if (!serviceAdapter.isComplete()) { | |
//wait between loops | |
Thread.sleep(pollingPeriod); | |
//keep polling | |
return RepeatStatus.CONTINUABLE; | |
}//end if | |
//default is to return finished | |
return RepeatStatus.FINISHED; | |
} | |
} |
note: there's a pollingInterval set that allows for the tasklet to 'pause' before trying again
now here's the glue part - the batch job definition itself
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:batch="http://www.springframework.org/schema/batch" | |
xmlns:task="http://www.springframework.org/schema/task" | |
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd | |
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> | |
<batch:job-repository/> | |
<batch:job id="pollingJob"> | |
<batch:step id="pollingJob.step1" next="pollingJob.step2"> | |
<batch:tasklet ref="basicTasklet"/> | |
</batch:step> | |
<batch:step id="pollingJob.step2" next="pollingJob.step3"> | |
<batch:tasklet ref="pollingTasklet"/> | |
</batch:step> | |
<batch:step id="pollingJob.step3"> | |
<batch:tasklet ref="basicTasklet"/> | |
</batch:step> | |
</batch:job> | |
<bean id="serviceAdapter" | |
class="org.incompletecode.spring.batch.service.SimpleServiceAdapter"> | |
<property name="executor" ref="taskExecutor"/> | |
</bean> | |
<bean id="basicTasklet" | |
class="org.incompletecode.spring.batch.tasklet.BasicTasklet"> | |
</bean> | |
<bean id="pollingTasklet" | |
class="org.incompletecode.spring.batch.tasklet.PollingTasklet"> | |
</bean> | |
<bean id="jobExplorer" | |
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"> | |
<property name="dataSource" ref="dataSource"/> | |
</bean> | |
<bean id="jobRegistry" | |
class="org.springframework.batch.core.configuration.support.MapJobRegistry"> | |
</bean> | |
<bean id="jobLauncher" | |
class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> | |
<property name="jobRepository" ref="jobRepository"/> | |
<property name="taskExecutor" ref="jobLauncherTaskExecutor"/> | |
</bean> | |
<bean id="postProcessor" | |
class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> | |
<property name="jobRegistry" ref="jobRegistry"/> | |
</bean> | |
<bean id="jobOperator" | |
class="org.springframework.batch.core.launch.support.SimpleJobOperator"> | |
<property name="jobExplorer" ref="jobExplorer"/> | |
<property name="jobLauncher" ref="jobLauncher"/> | |
<property name="jobRegistry" ref="jobRegistry"/> | |
<property name="jobRepository" ref="jobRepository"/> | |
</bean> | |
<bean id="transactionManager" | |
class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> | |
<property name="dataSource" ref="dataSource"/> | |
</bean> | |
<task:executor id="jobLauncherTaskExecutor"/> | |
<bean id="taskExecutor" class="java.util.concurrent.Executors" | |
factory-method="newSingleThreadExecutor" | |
destroy-method="shutdownNow" /> | |
<bean id="pollingInterval" class="java.lang.Long"> | |
<constructor-arg value="500"/> | |
</bean> | |
</beans> |
and the resources as well
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:jdbc="http://www.springframework.org/schema/jdbc" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd"> | |
<beans profile="dev"> | |
<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool"> | |
<constructor-arg> | |
<bean class="org.h2.jdbcx.JdbcDataSource"> | |
<property name="URL" value="jdbc:h2:mem:a;DB_CLOSE_DELAY=-1"/> | |
</bean> | |
</constructor-arg> | |
</bean> | |
</beans> | |
<beans profile="setupDB"> | |
<jdbc:initialize-database> | |
<jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/> | |
</jdbc:initialize-database> | |
</beans> | |
</beans> |
finally, here's a unit test to make sure it works
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.incompletecode.spring.batch; | |
import static org.junit.Assert.assertEquals; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.batch.core.ExitStatus; | |
import org.springframework.batch.core.explore.JobExplorer; | |
import org.springframework.batch.core.launch.JobOperator; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.test.context.ActiveProfiles; | |
import org.springframework.test.context.ContextConfiguration; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
@RunWith(SpringJUnit4ClassRunner.class) | |
@ContextConfiguration({"classpath:/META-INF/spring/batch-context.xml", | |
"classpath:/META-INF/spring/data-context.xml"}) | |
@ActiveProfiles(profiles={"dev","setupDB"}) | |
public class BatchTest { | |
@Autowired | |
private JobOperator jobOperator; | |
@Autowired | |
private JobExplorer jobExplorer; | |
@Test | |
public void test() throws Exception { | |
//start the job | |
Long id = jobOperator.start("pollingJob","runtime=" + System.currentTimeMillis()); | |
//monitor | |
while (jobExplorer.getJobExecution(id).isRunning()) { | |
//wait | |
Thread.sleep(500);// 1/2 a second | |
}//end while | |
//test | |
assertEquals(jobExplorer.getJobExecution(id).getExitStatus().getExitCode(),ExitStatus.COMPLETED.getExitCode()); | |
} | |
} |
and to help get the project going, the maven pom too
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org.incompletecode.spring.batch</groupId> | |
<artifactId>spring-batch-polling</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>spring-batch-polling</name> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.batch</groupId> | |
<artifactId>spring-batch-core</artifactId> | |
<version>2.1.9.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-test</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-tx</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-aop</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-beans</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-core</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-context</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-jdbc</artifactId> | |
<version>3.1.0.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>com.h2database</groupId> | |
<artifactId>h2</artifactId> | |
<version>1.3.165</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.10</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
</project> |
No comments:
Post a Comment