Wednesday, April 24, 2013

Spring Integration - Losing the ReplyChannel when sending over JMS

Spring Integration - Losing the ReplyChannel when sending over JMS

a scenario you may have encountered is the need to "asynchronously" get a request/response in Spring Integration via JMS.  that is, you want to send multiple, non-blocking JMS messages out, then pair them up on the way back in.

there are a couple of options to do this;
1. use a dispatcher (task executor) on your outbound channel to a JMS outbound-gateway - this will use a thread from the pool to process and wait for the reply.
2. (send) in conjunction with a message-drive-channel-adapter (receive) - this will use a thread pool on the receive.

to support the 2nd option, you can set the replyChannel to the message allowing you to 'route responses'.  an example of this is in org.springframework.batch.integration.partition.MessageChannelPartitionHandler from the Spring Batch Integration project.  the rub is, the replyChannel gets 'lost' when sending over JMS; it doesn't get translated to the wire.  

now nor should it.  the consumer of the message on the other end may be Spring Integration driven too and have it's own replyChannel settings.  so, one solution may be to use an extended HeaderMapper.  

the intent is to map any dynamic channels into the applicationContext (beanFactory as a singleton).  then, set that value into a new JMS-compatible format (String).  this header field will be 'left alone' by the consumer and returned.  on it's return, the replyChannel object is looked up and set to the replyChannel header value, allowing Spring Integration to appropriately route.

here's an example of the header


import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.jms.DefaultJmsHeaderMapper;

public class JMSReplyChannelJmsHeaderMapper extends DefaultJmsHeaderMapper implements ApplicationContextAware {

 private final Log logger = LogFactory.getLog(this.getClass());
 public static final String SENDER_REPLY_CHANNEL_NAME = "senderReplyChannelName";
 private ApplicationContext applicationContext;
 public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
  //process the parent normally
  super.fromHeaders(headers, jmsMessage);
  //now check if the headers contains a replyChannel
  if (headers.getReplyChannel() != null) {
   //now process it
   Object replyChannel = headers.getReplyChannel();
   String replyChannelName = null;
   if (replyChannel instanceof String) {
    //it's the name
    replyChannelName = replyChannel.toString();
   } else {
    //test if the object exists in the app context already
    if (!applicationContext.containsBean(replyChannel.toString())) {
     //generate a name --> use the id
     replyChannelName = headers.getId().toString();
     //set into the app context
     ((ConfigurableApplicationContext)applicationContext).getBeanFactory().registerSingleton(replyChannelName, replyChannel);     
    } else {
     replyChannelName = replyChannel.toString();
    }//end if
   }//end if
   //now add a message header
   try {
    jmsMessage.setStringProperty(SENDER_REPLY_CHANNEL_NAME, replyChannelName);
   } catch (JMSException e) {"failed to set senderReplyChannelName, skipping", e);
  }//end if

 public Map toHeaders(Message jmsMessage) {
  Map headers = super.toHeaders(jmsMessage);
  //check if there's a SENDER_REPLY_CHANNEL_NAME
  try {
   if (jmsMessage.getStringProperty(SENDER_REPLY_CHANNEL_NAME) != null) {
    //check if the name maps to the app context
    String replyChannelName = jmsMessage.getStringProperty(SENDER_REPLY_CHANNEL_NAME);
    if (applicationContext.containsBean(replyChannelName)) {
     //got it"setting: " + replyChannelName + "as reply channel");
    }//end if
  } catch (JMSException e) {"failed to set process the senderReplyChannelName, skipping", e);
  return headers;

 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  this.applicationContext = applicationContext;


and here's a rather long configuration using the MessageChannelPartitionHandler
















No comments:

Post a Comment