SyntaxHighlighter

Friday, July 5, 2013

Spring Batch - looping over multiple files

Spring Batch - looping over multiple files


here's a quick solution to a scenario coming from this question.  essentially, an array of file names will be passed in as a job parameter and the job needs to process each of these files.  one option is to have the 'looping' part managed by the job orchestrator (it loops, gets a single file, then kicks off the job for that file) the other is to pass the array in, and then loop internally in the job.

which option to choose would be dependent on other aspects of the use case, but here's an example of looping internally in the job

here's the job definition;



 
  
   
   
  
  
   
    
   
  
  
   
    
      
  
 
 

 
  
  
 
 
 


 
 
 
 
 
 
 



now here's the decider code;

package de.incompleteco.spring.batch.decider;

import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class FileDecision implements JobExecutionDecider {

 public static final String INPUT_FILE = "input.file";
 public static final String INPUT_FILES = "input.files";
 public static final String DELIMITER = ",";
 
 private Queue < String > inputFiles;
 
 @Override
 public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
  //check if the jobExecution has the input.file in it's context
  if (!jobExecution.getExecutionContext().containsKey(INPUT_FILE)) {
   //build the queue
   inputFiles = new LinkedBlockingQueue < String >(Arrays.asList(jobExecution.getJobParameters().getString(INPUT_FILES).split(DELIMITER)));
  }//end if
  //pop and add
  String file = inputFiles.poll();
  if (file != null) {
   jobExecution.getExecutionContext().put(INPUT_FILE, file);
   return FlowExecutionStatus.UNKNOWN;
  }//end if
  //return 'done'
  return FlowExecutionStatus.COMPLETED;
 }

}


our helper writer;

package de.incompleteco.spring.batch.item.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class SystemOutItemWriter implements ItemWriter < String > {

 @Override
 public void write(List items) throws Exception {
  for (String item : items) {
   System.out.println("this is what was received:" + item);
  }//end for
 }

}


and our resources.



 
 
 
  
 
 
 

 
  
  
 
 
 



finally, our unit test to see if it all works;

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertFalse;

import java.io.File;
import java.io.FileWriter;

import javax.annotation.Resource;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import de.incompleteco.spring.batch.decider.FileDecision;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/META-INF/spring/*-context.xml")
public class MultipleFileProcessIntegrationTest {

 @Rule
 public TemporaryFolder folder = new TemporaryFolder();
 
 @Resource
 private Job job;
 
 @Resource
 private JobLauncher jobLauncher;
 
 @Resource
 private JobExplorer jobExplorer;
 
 @Test
 public void test() throws Exception {
  //somewhere to hold the filenames
  StringBuilder builder = new StringBuilder();
  //create 3 files
  for (int i=0;i<3 add="" content:="" content="" file="" filename="" filewriter="" i="" if="" some="" test="" testfile="" the="" txt="" write="" writer.close="" writer.flush="" writer.write="" writer="new"> 0) {
    builder.append(FileDecision.DELIMITER);
   }//end if
   builder.append(file.getAbsolutePath());
   //show it
   System.out.println(file.getAbsolutePath());
  }//end loop
  
  
  //now build the job parameters
  JobParameters parameters = new JobParametersBuilder().addString(FileDecision.INPUT_FILES,builder.toString()).toJobParameters();
  //execution
  JobExecution execution = jobLauncher.run(job,parameters);
  //check
  while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
   Thread.sleep(100);
  }//end while
  //load
  execution = jobExplorer.getJobExecution(execution.getId());
  //check
  assertFalse(execution.getStatus().isUnsuccessful());
 }
 
}

.

No comments:

Post a Comment