SyntaxHighlighter

Sunday, July 14, 2013

Spring Batch Job as Neo4J Graph

Spring Batch Job as Neo4J Graph


here's a quick snippet on how to turn a Spring Batch Job object into a Neo4J graph representation.

first the node structure which looks something like this;



a Job;

package de.incompleteco.spring.batch.graph.domain;

import java.io.Serializable;

import org.springframework.data.neo4j.annotation.GraphId;
import org.springframework.data.neo4j.annotation.Indexed;
import org.springframework.data.neo4j.annotation.NodeEntity;

@NodeEntity
public class JobNode implements Serializable {

 private static final long serialVersionUID = 1L;

 @GraphId
 private Long id;
 
 @Indexed
 private String name;
 
 private StepNode firstNode;
 
 public JobNode() { }
 
 public JobNode(String name) {
  this.name = name;
 }

 public Long getId() {
  return id;
 }

 public void setId(Long id) {
  this.id = id;
 }

 public String getName() {
  return name;
 }

 public void setName(String name) {
  this.name = name;
 }

 public StepNode getFirstNode() {
  return firstNode;
 }

 public void setFirstNode(StepNode firstNode) {
  this.firstNode = firstNode;
 }
 
 
}

a Step;
package de.incompleteco.spring.batch.graph.domain;

import java.io.Serializable;
import java.util.Set;

import org.neo4j.graphdb.Direction;
import org.springframework.data.neo4j.annotation.Fetch;
import org.springframework.data.neo4j.annotation.GraphId;
import org.springframework.data.neo4j.annotation.Indexed;
import org.springframework.data.neo4j.annotation.NodeEntity;
import org.springframework.data.neo4j.annotation.RelatedTo;

import com.fasterxml.jackson.annotation.JsonIgnore;

@NodeEntity
public class StepNode implements Serializable {

 private static final long serialVersionUID = 1L;
 
 @GraphId
 private Long id;
 
 @Indexed
 private String name;
 
 private StepType type;
 
 @JsonIgnore
 @Fetch @RelatedTo(type="NEXT_NODE",direction=Direction.INCOMING)
 private Set < StepNode > parent;
 
 @Fetch @RelatedTo(type="NEXT_NODE")
 private Set < StepNode > next; 
 
 public StepNode() { }
 
 public StepNode(String name) {
  this();
  this.name = name;
 }

 public Long getId() {
  return id;
 }

 public void setId(Long id) {
  this.id = id;
 }

 public String getName() {
  return name;
 }

 public void setName(String name) {
  this.name = name;
 }

 public StepType getType() {
  return type;
 }

 public void setType(StepType type) {
  this.type = type;
 }

 public Set < StepNode > getParent() {
  return parent;
 }

 public void setParent(Set < StepNode > parent) {
  this.parent = parent;
 }

 public Set < StepNode > getNext() {
  return next;
 }

 public void setNext(Set < StepNode > next) {
  this.next = next;
 }

 @Override
 public String toString() {
  return "StepNode [id=" + id + ", name=" + name + ", type=" + type + "]";
 }
 
 

}


the Relationship between Steps (the black lines between nodes);
package de.incompleteco.spring.batch.graph.domain;

import java.io.Serializable;

import org.springframework.data.neo4j.annotation.EndNode;
import org.springframework.data.neo4j.annotation.GraphId;
import org.springframework.data.neo4j.annotation.RelationshipEntity;
import org.springframework.data.neo4j.annotation.StartNode;

@RelationshipEntity(type="NEXT_NODE")
public class NextNode implements Serializable {
 
 private static final long serialVersionUID = 1L;

 @GraphId
 private Long id;
 
 @StartNode
 private StepNode startNode;
 
 @EndNode
 private StepNode endNode;

 public Long getId() {
  return id;
 }

 public void setId(Long id) {
  this.id = id;
 }

 public StepNode getStartNode() {
  return startNode;
 }

 public void setStartNode(StepNode startNode) {
  this.startNode = startNode;
 }

 public StepNode getEndNode() {
  return endNode;
 }

 public void setEndNode(StepNode endNode) {
  this.endNode = endNode;
 }
 
 
}


a supporting "type";
package de.incompleteco.spring.batch.graph.domain;

import org.springframework.batch.core.job.flow.State;
import org.springframework.batch.core.job.flow.support.state.DecisionState;
import org.springframework.batch.core.job.flow.support.state.SplitState;

public enum StepType {

 STEP,DECISION,SPLIT;

 public static StepType getType(State state) {
  if (state instanceof DecisionState) {
   return DECISION;
  } else if (state instanceof SplitState) {
   return SPLIT;
  } else {
   return STEP;
  }
 }
 
}


now, a couple of supporting players in this pattern, namely to help with the transactional aspect of Spring-Data-Neo4J (this is pretty rough and could be cleaned up a lot)

the GraphService;
package de.incompleteco.spring.batch.graph;

import de.incompleteco.spring.batch.graph.domain.JobNode;
import de.incompleteco.spring.batch.graph.domain.StepNode;

public interface GraphService {

 public void saveStepNode(StepNode stepNode);

 public void addNext(StepNode startNode,StepNode nextNode);
 
 public void saveJobNode(JobNode jobNode);
}


the Neo4JGraphService;
package de.incompleteco.spring.batch.graph;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.neo4j.support.Neo4jTemplate;
import org.springframework.transaction.annotation.Transactional;

import de.incompleteco.spring.batch.graph.domain.JobNode;
import de.incompleteco.spring.batch.graph.domain.NextNode;
import de.incompleteco.spring.batch.graph.domain.StepNode;

public class Neo4JGraphService implements GraphService {

 @Autowired
 private Neo4jTemplate template;
 
 @Override
 @Transactional
 public void saveStepNode(StepNode stepNode) {
  System.out.println("saving..." + stepNode);
  template.save(stepNode);
 }

 @Override
 @Transactional
 public void addNext(StepNode startNode, StepNode nextNode) {
  System.out.println("creating from " + startNode + " to " + nextNode);
  template.createRelationshipBetween(startNode, nextNode, NextNode.class, "NEXT_NODE", true);
 }

 @Override
 @Transactional 
 public void saveJobNode(JobNode jobNode) {
  template.save(jobNode);
 }

 public void setTemplate(Neo4jTemplate template) {
  this.template = template;
 }

}


and finally, the guts of it all, the GraphBuilder;
package de.incompleteco.spring.batch.graph;

import java.lang.reflect.Field;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.FlowJob;
import org.springframework.batch.core.job.flow.State;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.job.flow.support.StateTransition;
import org.springframework.batch.core.job.flow.support.state.DecisionState;
import org.springframework.batch.core.job.flow.support.state.SplitState;
import org.springframework.beans.factory.annotation.Autowired;

import de.incompleteco.spring.batch.graph.domain.JobNode;
import de.incompleteco.spring.batch.graph.domain.StepNode;
import de.incompleteco.spring.batch.graph.domain.StepNodeRepository;
import de.incompleteco.spring.batch.graph.domain.StepType;

public class GraphBuilder {

 private static final String PATTERN_FIELD = "pattern";
 private static final String END_NEXT_PATTERN = "\\w.*.end\\d.*";
 private static final String FAIL_NEXT_PATTERN = "\\w.*.fail\\d.*";
 private static final String UNKNOWN_PATTERN = "UNKNOWN";
 private static final String DELEGATE_STATE = "DelegateState";
 private static final String FLOW_FIELD = "flow";
 private static final String STATE_TRANSITIONS_FIELD = "stateTransitions";
 private static final String START_STATE_FIELD = "startState";
 
 @Autowired
 private StepNodeRepository stepNodeRepository;
 
 @Autowired
 private GraphService graphService;
 
 public JobNode buildJobNode(Job job) throws Exception {
  //extract the flow
  FlowJob flowJob = (FlowJob) job;
  Field field = flowJob.getClass().getDeclaredField(FLOW_FIELD);
  field.setAccessible(true);
  SimpleFlow flow = (SimpleFlow) field.get(flowJob);
  build(flow);
  //build the first one
  field = flow.getClass().getDeclaredField(START_STATE_FIELD);
  field.setAccessible(true);
  State state = (State) field.get(flow);
  StepNode startNode = stepNodeRepository.findByName(state.getName());
  //set into the job
  JobNode jobNode = new JobNode(job.getName());
  jobNode.setFirstNode(startNode);
  //save
  graphService.saveJobNode(jobNode);
  //return
  return jobNode;
 }
 
 
 @SuppressWarnings("unchecked")
 public void build(SimpleFlow simpleFlow) throws Exception {
  Field field = simpleFlow.getClass().getDeclaredField(STATE_TRANSITIONS_FIELD);
  field.setAccessible(true);
  List < StateTransition > transitions = (List < StateTransition > )field.get(simpleFlow);
  //loop
  for (StateTransition stateTransition : transitions) {
   //init
   StepNode node = null;
   StepNode nextNode = null;
   String pattern = null;
   //examine the stateTransition
   State state = stateTransition.getState();
   field = stateTransition.getClass().getDeclaredField(PATTERN_FIELD);
   field.setAccessible(true);
   pattern = field.get(stateTransition).toString();
   //check
   if (stateTransition.getNext() != null 
     && stateTransition.getNext().matches(FAIL_NEXT_PATTERN)) {
    continue;//skip this one
   } else if (pattern.equals(UNKNOWN_PATTERN) 
     && stateTransition.getNext() == null) {
    continue;//skip this one
   } else if (state.getClass().getSimpleName().equals(DELEGATE_STATE)) {
    continue;//skip this one
   } else if (state instanceof DecisionState 
     && (stateTransition.getNext() != null 
     && stateTransition.getNext().matches(END_NEXT_PATTERN))) {
    continue;//skip this one
   }//end if
   //we now have a record to look at
   node = stepNodeRepository.findByName(state.getName());
   //check if it exists
   if (node == null) {
    //build it
    node = new StepNode(state.getName());
   }//end if
   //manage the 'type'
   node.setType(StepType.getType(state));   
   //persist
   graphService.saveStepNode(node);
   //process the 'next' node
   if (!stateTransition.getNext().matches(END_NEXT_PATTERN) 
     && !stateTransition.getNext().matches(FAIL_NEXT_PATTERN)) {
    //look it up
    nextNode = stepNodeRepository.findByName(stateTransition.getNext());
    if (nextNode == null) {
     nextNode = new StepNode(stateTransition.getNext());
     //persist
     graphService.saveStepNode(nextNode);
    }//end if
    //create the relationship
    graphService.addNext(node, nextNode);
   }//end if
   //now - extra handling for a SPLIT
   if (node.getType() == StepType.SPLIT) {
    //process the 'inner' flows
    for (Flow innerFlow : ((SplitState)state).getFlows()) {
     //build the steps
     build((SimpleFlow)innerFlow);
     //now add to the split
     field = innerFlow.getClass().getDeclaredField(START_STATE_FIELD);
     field.setAccessible(true);
     String stepName = ((State)field.get(innerFlow)).getName();
     //retrieve
     nextNode = stepNodeRepository.findByName(stepName);
     if (nextNode == null) {
      nextNode = new StepNode(stepName);
      //persist
      graphService.saveStepNode(nextNode);
     }//end if
     //add the relationship
     graphService.addNext(node, nextNode);
    }//end for
   }//end if
   }//end for
 }


 public void setStepNodeRepository(StepNodeRepository stepNodeRepository) {
  this.stepNodeRepository = stepNodeRepository;
 }


 public void setGraphService(GraphService graphService) {
  this.graphService = graphService;
 }
 
}


there's a few key things to note in this;
1. it assumes a FlowJob - this whole pattern doesn't really work without one and a FlowJob is what the XML config will build when parsed
2. it uses reflection to retrieve key pieces - StateTransition list is a private field but we need access to it to interrogate the structure
3. the pattern is after loading, to support different ways of defining jobs (i.e. namespace, bean, JavaConfig)

and now the Spring-Data-Neo4J config,



 
 
 
 
 
 
 
 
 

 



and our test batch xml.



 
  
 

 
  
 
 
 
  
  
 

 
 
 







 

 
  
   
  
  
   
  
  
   
  
  
   
  
  
   
          
 
 
 
 
  
   
  
  
   
  
  
   
    
     
    
    
     
        
   
   
    
     
    
    
     
       
   
   
    
     
       
      
  
  
   
  
  

 
  
   
    
   
  
  
 



so, now here's the unit test.
package de.incompleteco.spring.batch.graph;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.fasterxml.jackson.databind.ObjectMapper;

import de.incompleteco.spring.batch.graph.domain.JobNode;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath*:/META-INF/spring/*-context.xml"})
public class GraphBuilderIntegrationTest {

 @Resource
 private GraphBuilder builder;
 
 @Resource(name="simpleLinearJob")
 private Job job;
 
 @Resource(name="decisionAndFlowJob")
 private Job complextJob; 
 
 @Test
 public void testBuildComplexJobNode() throws Exception {
  JobNode jobNode = builder.buildJobNode(complextJob);
  //show it
  new ObjectMapper().writerWithDefaultPrettyPrinter().writeValue(System.out, jobNode);
 } 

}


finally, to help out, here's the maven pom.xml too

  4.0.0
  de.incompleteco.spring.batch.graph
  spring-batch-graph
  0.0.1-SNAPSHOT
  spring-batch-graph
    
  
    
      org.apache.maven.plugins
      maven-compiler-plugin
      
        1.7
        1.7
      
    
    
  
  
   
    org.springframework.batch
    spring-batch-core
    2.2.0.RELEASE
   
   
    org.springframework.data
    spring-data-neo4j
    2.0.2.RELEASE
   
   
    org.neo4j
    neo4j-cypher
    1.6
   
   
    org.springframework
    spring-test
    3.2.2.RELEASE
    test
   
   
    junit
    junit
    4.10
    test
   
   
    com.fasterxml.jackson.core
    jackson-databind
    2.2.0
    test
   
  



No comments:

Post a Comment