SyntaxHighlighter

Tuesday, September 18, 2012

Spring Batch - Simple Async & Polling

Spring Batch - Simple Async & Polling

here's a use case a client came upon; they were going to use a Spring Batch Tasklet to call a service that would do some file processing.  The call to the remote service, if synchronous, would cause a transaction timeout in the Spring Batch world namely because the file processing service would take so long (let's say 10 mins).

to solve this, we looked at the RepeatStatus of the Tasklet as a 'polling-type' solution.  that is, we wouldn't allow the Tasklet to continue until we've gotten back an 'ok' from the service.  The service was then launched as FutureTask which sorted that out.

so, in the name of preserving this solution, here's the example

first the service interface that we want to call from batch

package org.incompletecode.spring.batch.service;
public interface ServiceAdapter {
public void startService();
public boolean isComplete();
public boolean hasStarted();
}
view raw gistfile1.java hosted with ❤ by GitHub


then the implementation

package org.incompletecode.spring.batch.service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
public class SimpleServiceAdapter implements ServiceAdapter {
private boolean started = false;
private FutureTask<BasicService> future;
private ExecutorService executor;
public void startService() {
future = new FutureTask<BasicService>(new Callable<BasicService>() {
public BasicService call() throws Exception {
BasicService basicService = new BasicService();
//start
basicService.process();
//return
return basicService;
} });
//execute it
executor.execute(future);
//set the value
started = true;
}
public boolean isComplete() {
if (hasStarted()) {
return future.isDone();
}//end if
return false;//default
}
public boolean hasStarted() {
return started;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
class BasicService {
public void process() throws Exception {
//running in here
//wait for a while
Thread.sleep(60 * 1000);//1 minute
//done
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub


note: the BasicService class inside it is the dummy service

now a very very simple task let (place holder really)

package org.incompletecode.spring.batch.tasklet;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class BasicTasklet implements Tasklet {
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
//does something here
return RepeatStatus.FINISHED;
}
}
view raw gistfile1.java hosted with ❤ by GitHub


and here's where the 'polling' happens

package org.incompletecode.spring.batch.tasklet;
import org.incompletecode.spring.batch.service.ServiceAdapter;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
public class PollingTasklet implements Tasklet {
@Autowired
private ServiceAdapter serviceAdapter;
@Autowired
private Long pollingPeriod;
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
// lets check the status
if (!serviceAdapter.hasStarted()) {
//start it
serviceAdapter.startService();
//wait between seeing if it's done
Thread.sleep(pollingPeriod);
//return a state
return RepeatStatus.CONTINUABLE;//polling
} else if (!serviceAdapter.isComplete()) {
//wait between loops
Thread.sleep(pollingPeriod);
//keep polling
return RepeatStatus.CONTINUABLE;
}//end if
//default is to return finished
return RepeatStatus.FINISHED;
}
}
view raw gistfile1.java hosted with ❤ by GitHub


note: there's a pollingInterval set that allows for the tasklet to 'pause' before trying again

now here's the glue part - the batch job definition itself

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<batch:job-repository/>
<batch:job id="pollingJob">
<batch:step id="pollingJob.step1" next="pollingJob.step2">
<batch:tasklet ref="basicTasklet"/>
</batch:step>
<batch:step id="pollingJob.step2" next="pollingJob.step3">
<batch:tasklet ref="pollingTasklet"/>
</batch:step>
<batch:step id="pollingJob.step3">
<batch:tasklet ref="basicTasklet"/>
</batch:step>
</batch:job>
<bean id="serviceAdapter"
class="org.incompletecode.spring.batch.service.SimpleServiceAdapter">
<property name="executor" ref="taskExecutor"/>
</bean>
<bean id="basicTasklet"
class="org.incompletecode.spring.batch.tasklet.BasicTasklet">
</bean>
<bean id="pollingTasklet"
class="org.incompletecode.spring.batch.tasklet.PollingTasklet">
</bean>
<bean id="jobExplorer"
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="jobRegistry"
class="org.springframework.batch.core.configuration.support.MapJobRegistry">
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
<property name="taskExecutor" ref="jobLauncherTaskExecutor"/>
</bean>
<bean id="postProcessor"
class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
<bean id="jobOperator"
class="org.springframework.batch.core.launch.support.SimpleJobOperator">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="jobLauncher" ref="jobLauncher"/>
<property name="jobRegistry" ref="jobRegistry"/>
<property name="jobRepository" ref="jobRepository"/>
</bean>
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<task:executor id="jobLauncherTaskExecutor"/>
<bean id="taskExecutor" class="java.util.concurrent.Executors"
factory-method="newSingleThreadExecutor"
destroy-method="shutdownNow" />
<bean id="pollingInterval" class="java.lang.Long">
<constructor-arg value="500"/>
</bean>
</beans>
view raw gistfile1.xml hosted with ❤ by GitHub


and the resources as well

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd">
<beans profile="dev">
<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool">
<constructor-arg>
<bean class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:mem:a;DB_CLOSE_DELAY=-1"/>
</bean>
</constructor-arg>
</bean>
</beans>
<beans profile="setupDB">
<jdbc:initialize-database>
<jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
</jdbc:initialize-database>
</beans>
</beans>
view raw gistfile1.xml hosted with ❤ by GitHub


finally, here's a unit test to make sure it works

package org.incompletecode.spring.batch;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/batch-context.xml",
"classpath:/META-INF/spring/data-context.xml"})
@ActiveProfiles(profiles={"dev","setupDB"})
public class BatchTest {
@Autowired
private JobOperator jobOperator;
@Autowired
private JobExplorer jobExplorer;
@Test
public void test() throws Exception {
//start the job
Long id = jobOperator.start("pollingJob","runtime=" + System.currentTimeMillis());
//monitor
while (jobExplorer.getJobExecution(id).isRunning()) {
//wait
Thread.sleep(500);// 1/2 a second
}//end while
//test
assertEquals(jobExplorer.getJobExecution(id).getExitStatus().getExitCode(),ExitStatus.COMPLETED.getExitCode());
}
}
view raw gistfile1.java hosted with ❤ by GitHub


and to help get the project going, the maven pom too

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.incompletecode.spring.batch</groupId>
<artifactId>spring-batch-polling</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-polling</name>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>2.1.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.3.165</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
view raw gistfile1.xml hosted with ❤ by GitHub

No comments:

Post a Comment