Friday, July 5, 2013

Spring Batch - looping over multiple files

Spring Batch - looping over multiple files

here's a quick solution to a scenario coming from this question.  essentially, an array of file names will be passed in as a job parameter and the job needs to process each of these files.  one option is to have the 'looping' part managed by the job orchestrator (it loops, gets a single file, then kicks off the job for that file) the other is to pass the array in, and then loop internally in the job.

which option to choose would be dependent on other aspects of the use case, but here's an example of looping internally in the job

here's the job definition;




now here's the decider code;

package de.incompleteco.spring.batch.decider;

import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class FileDecision implements JobExecutionDecider {

 public static final String INPUT_FILE = "input.file";
 public static final String INPUT_FILES = "input.files";
 public static final String DELIMITER = ",";
 private Queue < String > inputFiles;
 public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
  //check if the jobExecution has the input.file in it's context
  if (!jobExecution.getExecutionContext().containsKey(INPUT_FILE)) {
   //build the queue
   inputFiles = new LinkedBlockingQueue < String >(Arrays.asList(jobExecution.getJobParameters().getString(INPUT_FILES).split(DELIMITER)));
  }//end if
  //pop and add
  String file = inputFiles.poll();
  if (file != null) {
   jobExecution.getExecutionContext().put(INPUT_FILE, file);
   return FlowExecutionStatus.UNKNOWN;
  }//end if
  //return 'done'
  return FlowExecutionStatus.COMPLETED;


our helper writer;

package de.incompleteco.spring.batch.item.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class SystemOutItemWriter implements ItemWriter < String > {

 public void write(List items) throws Exception {
  for (String item : items) {
   System.out.println("this is what was received:" + item);
  }//end for


and our resources.



finally, our unit test to see if it all works;

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertFalse;


import javax.annotation.Resource;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import de.incompleteco.spring.batch.decider.FileDecision;

public class MultipleFileProcessIntegrationTest {

 public TemporaryFolder folder = new TemporaryFolder();
 private Job job;
 private JobLauncher jobLauncher;
 private JobExplorer jobExplorer;
 public void test() throws Exception {
  //somewhere to hold the filenames
  StringBuilder builder = new StringBuilder();
  //create 3 files
  for (int i=0;i<3 add="" content:="" content="" file="" filename="" filewriter="" i="" if="" some="" test="" testfile="" the="" txt="" write="" writer.close="" writer.flush="" writer.write="" writer="new"> 0) {
   }//end if
   //show it
  }//end loop
  //now build the job parameters
  JobParameters parameters = new JobParametersBuilder().addString(FileDecision.INPUT_FILES,builder.toString()).toJobParameters();
  JobExecution execution =,parameters);
  while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
  }//end while
  execution = jobExplorer.getJobExecution(execution.getId());


No comments:

Post a Comment