Friday, July 5, 2013

Spring Batch - looping over multiple files

Spring Batch - looping over multiple files

here's a quick solution to a scenario coming from this question.  essentially, an array of file names will be passed in as a job parameter and the job needs to process each of these files.  one option is to have the 'looping' part managed by the job orchestrator (it loops, gets a single file, then kicks off the job for that file) the other is to pass the array in, and then loop internally in the job.

which option to choose would be dependent on other aspects of the use case, but here's an example of looping internally in the job

here's the job definition;




now here's the decider code;

package de.incompleteco.spring.batch.decider;

import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class FileDecision implements JobExecutionDecider {

 public static final String INPUT_FILE = "input.file";
 public static final String INPUT_FILES = "input.files";
 public static final String DELIMITER = ",";
 private Queue < String > inputFiles;
 public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
  //check if the jobExecution has the input.file in it's context
  if (!jobExecution.getExecutionContext().containsKey(INPUT_FILE)) {
   //build the queue
   inputFiles = new LinkedBlockingQueue < String >(Arrays.asList(jobExecution.getJobParameters().getString(INPUT_FILES).split(DELIMITER)));
  }//end if
  //pop and add
  String file = inputFiles.poll();
  if (file != null) {
   jobExecution.getExecutionContext().put(INPUT_FILE, file);
   return FlowExecutionStatus.UNKNOWN;
  }//end if
  //return 'done'
  return FlowExecutionStatus.COMPLETED;


our helper writer;

package de.incompleteco.spring.batch.item.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class SystemOutItemWriter implements ItemWriter < String > {

 public void write(List items) throws Exception {
  for (String item : items) {
   System.out.println("this is what was received:" + item);
  }//end for


and our resources.



finally, our unit test to see if it all works;

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertFalse;


import javax.annotation.Resource;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import de.incompleteco.spring.batch.decider.FileDecision;

public class MultipleFileProcessIntegrationTest {

 public TemporaryFolder folder = new TemporaryFolder();
 private Job job;
 private JobLauncher jobLauncher;
 private JobExplorer jobExplorer;
 public void test() throws Exception {
  //somewhere to hold the filenames
  StringBuilder builder = new StringBuilder();
  //create 3 files
  for (int i=0;i<3 add="" content:="" content="" file="" filename="" filewriter="" i="" if="" some="" test="" testfile="" the="" txt="" write="" writer.close="" writer.flush="" writer.write="" writer="new"> 0) {
   }//end if
   //show it
  }//end loop
  //now build the job parameters
  JobParameters parameters = new JobParametersBuilder().addString(FileDecision.INPUT_FILES,builder.toString()).toJobParameters();
  JobExecution execution =,parameters);
  while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
  }//end while
  execution = jobExplorer.getJobExecution(execution.getId());


1 comment:

  1. Here is a proper description of the problem :

    Actually code is similar to the one in the posted link above.
    Then only difference is that I am passing the "input.file" to FlatFileOutputWriter
    instead of FlatFileItemWriter.

    In this code the following is being done :

    1. We pass the job parameter "input.files" as a comma seperated list of file names.
    2. In the FileDecision class we retrieve the "input.files" parameter, split it
    into a list of string and iterate over it. In each iteration, we put a new parameter
    "input.file" in jobExecutionContext.
    3. In the spring bean declaration file (xml file), we pass the value of this
    "input.file" (in jobExecutionContext) to a FlatFileItemWriter (as the value of
    resource property).
    4. Then we pass this FlatFileItemReader to a job step (multipleFileProcess.step1)
    which is being iterated over. We expect the in each iteration, we set a new value
    for "input.file" (without 's' at the end of file), and we will be able to write
    the content to new output file.

    However, instead of writing in separate output
    files, it was writing the contents of all the iteration to a single output
    file (the output file which was set to the FlatFileItemWriter in the first iteration)

    The solution I implemented:

    1. Created a CustomItemWriter and injected it into job step (multipleFileProcess.step1)
    which is being iterated over.
    2. Injected the MultiResourceItemWriter to the CustomItemWriter.
    3. Injected the FlatFileItemWrited to MultiResourceItemWriter with resource value="".
    4. In the CustomItemWriter @BeforeStep method,
    a. created a new Resource with file value retrieved from "input.file" (in jobExecutionContext).
    b. set this new Resource to the MultiResourceItemWriter instance variable.

    Now, I am able to write to separate output file in each iteration.
    I will post the sample code also after the weekend.