SyntaxHighlighter

Wednesday, April 24, 2013

Spring Integration - Losing the ReplyChannel when sending over JMS

Spring Integration - Losing the ReplyChannel when sending over JMS

a scenario you may have encountered is the need to "asynchronously" get a request/response in Spring Integration via JMS.  that is, you want to send multiple, non-blocking JMS messages out, then pair them up on the way back in.

there are a couple of options to do this;
1. use a dispatcher (task executor) on your outbound channel to a JMS outbound-gateway - this will use a thread from the pool to process and wait for the reply.
2. (send) in conjunction with a message-drive-channel-adapter (receive) - this will use a thread pool on the receive.

to support the 2nd option, you can set the replyChannel to the message allowing you to 'route responses'.  an example of this is in org.springframework.batch.integration.partition.MessageChannelPartitionHandler from the Spring Batch Integration project.  the rub is, the replyChannel gets 'lost' when sending over JMS; it doesn't get translated to the wire.  

now nor should it.  the consumer of the message on the other end may be Spring Integration driven too and have it's own replyChannel settings.  so, one solution may be to use an extended HeaderMapper.  

the intent is to map any dynamic channels into the applicationContext (beanFactory as a singleton).  then, set that value into a new JMS-compatible format (String).  this header field will be 'left alone' by the consumer and returned.  on it's return, the replyChannel object is looked up and set to the replyChannel header value, allowing Spring Integration to appropriately route.

here's an example of the header

package de.incompleteco.spring.integration.jms.support;

import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.jms.DefaultJmsHeaderMapper;

public class JMSReplyChannelJmsHeaderMapper extends DefaultJmsHeaderMapper implements ApplicationContextAware {

 private final Log logger = LogFactory.getLog(this.getClass());
 
 public static final String SENDER_REPLY_CHANNEL_NAME = "senderReplyChannelName";
 
 private ApplicationContext applicationContext;
 
 @Override
 public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
  //process the parent normally
  super.fromHeaders(headers, jmsMessage);
  //now check if the headers contains a replyChannel
  if (headers.getReplyChannel() != null) {
   //now process it
   Object replyChannel = headers.getReplyChannel();
   String replyChannelName = null;
   //check
   if (replyChannel instanceof String) {
    //it's the name
    replyChannelName = replyChannel.toString();
   } else {
    //test if the object exists in the app context already
    if (!applicationContext.containsBean(replyChannel.toString())) {
     //generate a name --> use the id
     replyChannelName = headers.getId().toString();
     //set into the app context
     ((ConfigurableApplicationContext)applicationContext).getBeanFactory().registerSingleton(replyChannelName, replyChannel);     
    } else {
     replyChannelName = replyChannel.toString();
    }//end if
   }//end if
   //now add a message header
   try {
    jmsMessage.setStringProperty(SENDER_REPLY_CHANNEL_NAME, replyChannelName);
   } catch (JMSException e) {
    logger.info("failed to set senderReplyChannelName, skipping", e);
   }
  }//end if
 }

 @Override
 public Map toHeaders(Message jmsMessage) {
  Map headers = super.toHeaders(jmsMessage);
  //check if there's a SENDER_REPLY_CHANNEL_NAME
  try {
   if (jmsMessage.getStringProperty(SENDER_REPLY_CHANNEL_NAME) != null) {
    //check if the name maps to the app context
    String replyChannelName = jmsMessage.getStringProperty(SENDER_REPLY_CHANNEL_NAME);
    if (applicationContext.containsBean(replyChannelName)) {
     //got it
     logger.info("setting: " + replyChannelName + "as reply channel");
     headers.put(MessageHeaders.REPLY_CHANNEL,applicationContext.getBean(replyChannelName));
    }//end if
   }
  } catch (JMSException e) {
   logger.info("failed to set process the senderReplyChannelName, skipping", e);
  }
  //return
  return headers;
 }

 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  this.applicationContext = applicationContext;
 }

}


and here's a rather long configuration using the MessageChannelPartitionHandler



 
 
 
 
 
 
 

 
  
  
  
   
 
 
 
  
  
  
  
 
 
 
  
 

 
  
   
  
 

 
  
   
    
     
      
     
    
    
   
  
 

 

 
  
  
  
  

 
  
 
 
 

 
 
 
 
  
 
 
 
 
 

 

 
 
 

 

 

 

 
  
  
 
 
 

 
 
 
 
 
  
 
 
 
  
 
 
 
  
  
 
 
 



No comments:

Post a Comment