SyntaxHighlighter

Thursday, July 18, 2013

Spring Batch - How to stop simultaneous execution of a Job

Spring Batch - How to stop simultaneous execution of a Job


so here's the use case; Job A can only be "in-flight"/running one at a time.  That is, for business or resource reasons, no matter if the job parameters are different, only one instance of the Job can be running at a time.

here's a quick AOP solution

first our class;
package de.incompleteco.spring.batch.support;

import java.util.List;
import java.util.Set;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

public class SimultaneousJobAspect implements MethodInterceptor, InitializingBean {

 private static final int JOB_INDEX = 0;
 
 //list of job names that SHOULD NOT run simultaneously
 private List jobNames;
 
 private JobExplorer jobExplorer;
 
 @Override
 public Object invoke(MethodInvocation invocation) throws Throwable {
  //get the job names
  Object[] arguments = invocation.getArguments();
  //get the 'job' argument (argument 0)
  Job job = (Job) arguments[JOB_INDEX];
  //get the name
  for (String jobName : jobNames) {
   if (jobName.equalsIgnoreCase(job.getName())) {
    //check if there's one running
    Set jobExecutions = jobExplorer.findRunningJobExecutions(jobName);
    if (jobExecutions != null && !jobExecutions.isEmpty()) {
     //have a match --> throw a job exception
     throw new JobExecutionAlreadyRunningException(jobName + " is already running and can't be run simultaneously");
    }//end if
   }//end if
  }//end for
  //continue
  return invocation.proceed();
 }

 public void setJobNames(List jobNames) {
  this.jobNames = jobNames;
 }

 public void setJobExplorer(JobExplorer jobExplorer) {
  this.jobExplorer = jobExplorer;
 }

 @Override
 public void afterPropertiesSet() throws Exception {
  Assert.notNull(jobExplorer);
 }
 
}


then the aspect configuration (using the namespace)
 
  
 


 
  
  
   
    longJob
   
  
  

now, for testing, the test class
package de.incompleteco.spring.batch;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
public class SimultaneousJobAspectIntegrationTest {

 @Resource
 private JobLauncher jobLauncher;
 
 @Resource
 private Job simpleJob;
 
 @Resource
 private Job longJob;
 
 @Resource
 private JobExplorer jobExplorer;
 
 @Test
 public void test() throws Exception {
  //start  a simple job
  JobParameters jobParameters = new JobParametersBuilder().addLong("runtime",System.currentTimeMillis()).toJobParameters();
  JobExecution execution = jobLauncher.run(simpleJob,jobParameters);
  //monitor
  while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
   Thread.sleep(100);
  }//end while
  //load
  execution = jobExplorer.getJobExecution(execution.getId());
  //check
  assertFalse(execution.getStatus().isUnsuccessful());
  //now start a long job
  execution = jobLauncher.run(longJob,jobParameters);
  //start another one with different parameters
  jobParameters = new JobParametersBuilder().addLong("runtime",System.currentTimeMillis()).toJobParameters();
  try {
   execution = jobLauncher.run(longJob,jobParameters);
   fail("should've failed");
  }
  catch (JobExecutionAlreadyRunningException e) { }
  
 }
 
}


and our job and batch configs
 
  
   
    
     
      
       
      
     
     
    
   
  
 

 
  
   
    
   
  
 

 
 
 
  
 
 
 

 
  
  
 
 
 

 
 

oh, and the class that does the delay;
package de.incompleteco.spring.batch.tasklet;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class LongTasklet implements Tasklet {

 @Override
 public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
  
  Thread.sleep(10 * 1000);// wait for 10 seconds
  
  return RepeatStatus.FINISHED;
 }

}

No comments:

Post a Comment