SyntaxHighlighter

Saturday, July 13, 2013

Spring Integration JMS to Database

Spring Integration JMS to Database


this is a quick design to persist a very basic JMS message directly to a database.  this pattern could be extended to a use case where you want to very quickly (and in an ordered fashion) consume messages from a JMS queue, persist them quickly into a staging table, then raise a subsequent event for secondary processing of the messages.

anyway, here's the consumption part in Spring Integration.



 
 
 
 
  
  
 
  
   
    
    
    
   
  
 
 



now here's a sample message

MSFT,100.00,1373761697932

here's the resource configuration



 
  
 

 
  
 
 
 
   
 


and finally a JUnit test

package de.incompleteco.spring.integration;

import static org.junit.Assert.assertTrue;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INf/spring/*-context.xml"})
public class MessagingIntegrationTest {

 @Resource
 private ConnectionFactory connectionFactory;
 
 @Resource
 private Queue queue;
 
 @Resource
 private DataSource dataSource;
 
 @Before
 public void before() throws Exception {
  new JdbcTemplate(dataSource).execute("delete from target_table");
 }
 
 @Test
 public void test() throws Exception {
  //check in the database
  int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from target_table",Integer.class);
  //check
  assertTrue(count == 0);  
  
  //read in a message and send it
  BufferedReader reader = new BufferedReader(new InputStreamReader(new DefaultResourceLoader().getResource("classpath:/data/msft.msg").getInputStream()));
  final String message = reader.readLine();
  //send the message
  new JmsTemplate(connectionFactory).send(queue,new MessageCreator() {

   @Override
   public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message);
   } });
  //wait and check
  Thread.sleep(100);
  //check in the database
  count = new JdbcTemplate(dataSource).queryForObject("select count(*) from target_table",Integer.class);
  //check
  assertTrue(count == 1);
 }

}


one thing to note is that with Spring XD on it's way, this consumption becomes a little redundant; Spring XD does it better :)

No comments:

Post a Comment