Search This Blog

Tuesday, April 6, 2010

BatchMessageListenerContainer using Spring and MessageProxies

A few things I wanted to share on this BLOG. First is a way to Batch consume messages using Spring's Listener Containers and the second is an experiment on using Dynamic proxies to send messages and an application of the same.

1. Batch Message Listener:
When using the Spring Listener container, one typically consumes a single message at a time. This works in most scenarios. There however might be scenarios where consuming multiple messages and doing a task in a batch might be more effecient. Clearly one way to ensure batching is to have the message itself be an aggregation of multiple payloads that one would need to process in batch. From a consumer perspective one can process this single message as an atomic unit and either consume or reject the same. Clearly this works if the producer of the messages can/will group the messages together. Further this also means that the producer needs to have understanding of the consumer needs regarding batch effeciency.

What about another way of handling this scenario, where a Batch message listener is used. In this proposal, one would receive X number of messages together as a unit and process them all or roll back all. A Consumer can now choose to capitalize on performance gains by batching the message. There is of course a cost as far as the messaging system is concerned as it would need to hold onto the messages in the batch as unprocessed as long as the JMS Transaction is active.

I found that the spring-batch project had at one time a BatchMessageListenerContainer that did some batching. I could not find the same in later versions of the framework. So I created one that does the same. It is based of the DefaultMessageListenerContainer and has the following requirements of it:
  1. Receive at most X messages and process the same as a unit of work.
  2. If X messages cannot be received before a JMS Session time out, and only X-dX messages have been received when the timeout occurs, then,  process the X-dX received messages even if it did not meet the batch size.
  3. Commit or rollback applies to the entire set of messages. What this means is if there is a bad message, in the bunch, then all messages part of the batch are rolled back.
  4. Only support JMS Transacted Sessions, not supporting User Transactions at this point. Should be easy though.

As the SessionAwareMessageListener from Spring does not have an signature that supports multiple messages, I created one called SessionAwareBatchMessageListener:
public interface SessionAwareBatchMessageListener<M extends Message>{
  /**
   * Perform a batch action with the provided list of {@code messages}.
   * 
   * @param session JMS {@code Session} that received the messages
   * @param messages List of messages to be processed as a unit of work
   * @throws JMSException JMSException thrown if there is an error performing the operation.
   */
  public void onMessages(Session session, List<M> messages) throws JMSException;
}

The BatchMessageListenerContainer that I am demonstrating is an extension of the DefaultMessageListenerContainer and allows for the newly created listener. Note that all this is possible due to beauty of the design of the Spring code that allows for extensions.

The container will receive messages until either the batch size is hit or a JMS Session timeout occurs and dispatches the same to the listener to complete the operation. The code for the same is bit verbose but shown in totality below:
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
  public static final int DEFAULT_BATCH_SIZE = 100;
  private int batchSize = DEFAULT_BATCH_SIZE;

  public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
  
  /**
   * The doReceiveAndExecute() method has to be overriden to support multiple-message receives.
   */
  @Override
  protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
    TransactionStatus status) throws JMSException {
    Connection conToClose = null;
    MessageConsumer consumerToClose = null;
    Session sessionToClose = null;

    try {
      Session sessionToUse = session;
      MessageConsumer consumerToUse = consumer;
  
      if (sessionToUse == null) {
        Connection conToUse = null;
        if (sharedConnectionEnabled()) {
          conToUse = getSharedConnection();
        }
        else {
          conToUse = createConnection();
          conToClose = conToUse;
          conToUse.start();
        }
        sessionToUse = createSession(conToUse);
        sessionToClose = sessionToUse;
      }
      
      if (consumerToUse == null) {
        consumerToUse = createListenerConsumer(sessionToUse);
        consumerToClose = consumerToUse;
      }
      
      List<Message> messages = new ArrayList<Message>();

      int count = 0;
      Message message = null;
      // Attempt to receive messages with the consumer
      do {
        message = receiveMessage(consumerToUse);
        if (message != null) {
          messages.add(message);
        }
      }
      // Exit loop if no message was received in the time out specified, or
      // if the max batch size was met
      while ((message != null) && (++count < batchSize));

      if (messages.size() > 0) {
        // Only if messages were collected, notify the listener to consume the same.
        try {
          doExecuteListener(sessionToUse, messages);
          sessionToUse.commit();
        }
        catch (Throwable ex) {
          handleListenerException(ex);
          if (ex instanceof JMSException) {
            throw (JMSException) ex;
          }
        }
        return true;
      }

      // No message was received for the period of the timeout, return false.
      noMessageReceived(invoker, sessionToUse);
      return false;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumerToClose);
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
    }
  }

  protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
    if (!isAcceptMessagesWhileStopping() && !isRunning()) {
      if (logger.isWarnEnabled()) {
        logger.warn("Rejecting received messages because of the listener container "
          + "having been stopped in the meantime: " + messages);
      }
      rollbackIfNecessary(session);
      throw new JMSException("Rejecting received messages as listener container is stopping");
    }

    @SuppressWarnings("unchecked")
    SessionAwareBatchMessageListener<Message> lsnr = (SessionAwareBatchMessageListener<Message>) getMessageListener();

    try {
      lsnr.onMessages(session, messages);
    }
    catch (JMSException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (RuntimeException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (Error err) {
      rollbackOnExceptionIfNecessary(session, err);
      throw err;
    }
  }
  
  @Override
  protected void checkMessageListener(Object messageListener) {
    if (!(messageListener instanceof SessionAwareBatchMessageListener)) {
      throw new IllegalArgumentException("Message listener needs to be of type ["
        + SessionAwareBatchMessageListener.class.getName() + "]");
    }
  }
 
  @Override
  protected void validateConfiguration() {
    if (batchSize <= 0) {
      throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
    }
  }
}
There is a demonstration example in the code attached which shows how messages can be received in batch and processed on the consumer.

2. Proxy pattern for JMS Messaging Sending:
Recently I have been seeing quite a few REST Clients that support a Proxy pattern and wanted to experiment to see whether the same can be applied to JMS as well.

The RESTful system has some similarities with JMS:
  1.  Resource location can be compared to JMS Destination
  2.  Mime type can be compared to JMS Message Type
So if we defined a couple of annotations, one for Destination JNDI and a second for MessageType, a proxy interface could look like the following:
public interface MessageSender {
    @DestinationJndi("com.welflex.barqueue")
    @MessageType(ObjectMessage.class)
    public void sendObjectMessage(String s);

    @DestinationJndi("com.welflex.baz.queue")
    @MessageType(TextMessage.class)
    public void sendStringMessage(String s);

    @DestinationJndi("com.welflex.boo.queue")
    @MessageType(MapMessage.class)
    public void sendMapMessage(Map<String, Object> map);
  }
We could then create a Proxy based of the above interface and send messages as shown below:
public void testMethod() {
    SendExecutor executor = new ActiveMQSendExecutor("vm://localhost", "foo", "bar");
    MessageSender s = ProxyFactory.create(MessageSender.class, executor);
    s.sendObjectMessage("Foo");
    s.sendStringMessage("Bar");
    
    Map<String, Object> m = new HashMap<String, Object>();

    m.put("name", "Sanjay");
    m.put("age", 23);
    m.put("salary", 12313131.213F);    

    s.sendMapMessage(m);  
}  
I am not explaining the details of how the proxy is created as the same is available in the downloadable example. One can easily swap out the ActiveMQSendExecutor with a WebLogicSendExecutor, OpenJMSSendExecutor or any other JMS Provider implementation.

So there are many questions that arise with the above:
  1. What about controlling the message properties? Well, the same applies to header properties that one would need to handle with REST Proxy clients. One can define an execution interceptor to provide the same :-)
  2. JMS is so simple; is another layer of abstraction really worth it? Well I agree, mostly, but if all a client needs to do is send a particular type of message and we can abstract away the boiler plate why not? Sure there is some initial development in creating the send executors. Once done with the same, its as easy as annotating an interface and dispatching :-)
  3. What about bytes message and other types? Have not handled the same. 
Clearly there are more questions here. The above is just one of my Frankenstein experiments that I thought I'd share :-)

3. Examples:
Examples that utilize the above code are available in the provided example. A sample batch consumer is demonstrated that persists Person entries to a HSQL database using Hibernate. The batch size of messages consumed is 200 at a time and on every hundred records, Hibernate flushes the session.

Examples of the Proxy class usage is also demonstrated for sending different types of messages.

Download a Maven Example from HERE.

19 comments:

Sathya said...

m.put("age", 23);

Sanjay, You never told me your age was 23 ;).

Sanjay Acharya said...

I thought it was visually evident :-)

Fahim said...

Super concept!!! If it works, it will save my day. Thanks a lot..

Anonymous said...

The download link for source doesn't seem to work anymore. Can you please update it?

Sanjay Acharya said...

Comcast had a problem..go ahead and try now.

Anonymous said...

I'm sorry dude..missed your comment and can't download now.Can you please enable the download link again? Looks like you hosted the zip file locally.I'll make sure not to miss out this time

Thanks again.

Sanjay Acharya said...

Not sure. I can seem to download just fine.

Anonymous said...

I was able to download now...I have a similar batch processing requirement and your post has been helpful ..thanks a lot

Giovanni Botta said...

Thanks for the post. It took me the whole morning to find something like this on the web. I have a question: I'm a spring/JMS newbie and I'm trying to read N messages from a queue and persist them in a database in a single transaction (I'm using atomikos for that). Do you have any suggestion about how to achieve this?
Thanks!

Giovanni

Sanjay Acharya said...

The batch message listener container is doing nothing more than reading n number of messages in a JMS Session and then committing the same. It looks like you are planning on using 2PC. I don't have too much experience with the same but that said, you can lways commit your DB transaction and then your JMS transaction. Still chance of re-delivery of JMS messages but if your consumer is idempotent, should not be a problem.

Santhosh Srinivasulu said...

Thank you very much for the post! I had a similar requirement and was lucky to come across your post, really helpful!

Chandra Mohan said...

Hi, how do I mention a custom Listener container in spring xml configuration? Providing the bean name in the container-class doesn't seem to work.

Chandra Mohan said...

with reference to above comment, see http://stackoverflow.com/questions/19153102/how-to-specify-custom-jms-message-listener-in-spring-xml for more details.

Anonymous said...

Nice article. The download link for the source code doesn't seem to work. Can you please check it?

Anonymous said...

Nice Article!! The download link for source doesn't seem to work anymore. Can you please check it?

Justin Robbins said...

Hello, I know this post is quite old but unfortunately the download link is broken. Any possibility of reposting the zip or pushing it to Github? Thanks!

Sanjay Acharya said...

Ha! @JustinRobbins, you got it :-)! Cheers!

Justin Robbins said...

Thanks for posting it. Many thanks!

Java Developer said...

I think the parameter receiveTimeout is used for each receive method and not counted from the time when session is started ?