Search This Blog

Wednesday, February 17, 2010

Message Ordering in JMS - Using Coherence and Hibernate

As I continue to work with JMS and WebLogic, I wonder about Message Ordering. I have looked for strategies and best practices and have finally done some experimentation that I would like to share.

One of the common patterns when working with Enterprise Integration and Messaging is the Competing Consumer Design Pattern. The same is explained very well in the book "Enterprise Integration Patterns" by Hohpe and Woolf.




As shown above we have consumers that are competing for messages on the queue. As each consumer can process out a message, the consumers scale out horizontally and improve the performance while at the same time guaranteeing high availability.

For the sake of discussion, consider that by some stroke of chance the messages 2 and 3 are for the same item, simply published one after another. With the competing consumer pattern, it is possible that message 3 gets processed before message 2 resulting in an erroneous information propagated to downstream systems such as a database thus compromising the integrity of the data.

One of the ways to prevent this scenario from happening is using a single consumer as shown in the figure below where messages in the queue will be consumed (maybe persisted to the database) in serial order by the consumer:


One problem with this approach is that now we have compromised on the availability and performance of the system as we have only a single consumer performing the task.

The JMS Specification is clear that a messaging system must deliver message in FIFO (First in-First out) order to consumers. However, the specification is silent regarding ordering when related to re-delivery of a message. In the case of a single consumer if there is a problem processing the message and it gets rolled back due to say a problem with the underlying database which the consumer is communicating with, then most providers will not attempt to redeliver the message immediately but do so after a configurable delay. Between this delay, if the database problem is cured and the consumer picks Message 3 and processes the same, then when Message 2 is delivered and processed, the downstream database would contain stale data.

One could say that any rejected message should be placed at the very front of the queue. However, if the message was a poison message and the logic to detect the same and dequeue the message were not built in the consumer, then putting the message back to the front of the queue would render the queue non-consumable, unless there is retry count configured in which case the poison message could be expired or placed in a dead letter queue after the retry count.

A strategy that can be employed where the ordering of the message would not really matter is to use the message queue simply as a notifier with the consumers going back to the source system to get the most current record to process as shown in the figure below:


In the example shown above, the producer sends a message to the queue that is only a notifier with the "RECORD_ID" of the record from the source database. The consumer upon receiving the message goes back to the source database to obtain the payload information and then process the message. By doing so, regardless of the messaging order, the most recent payload for a record is selected.

The above approach works great when one does have access to the source system and represents a tight coupling in the system. From a performance perspective, we save on the size of messages and the fact that only selects need to be issued on the source system. Further there is something to say about the simplicity of the solution. It however crumbles when the consuming end of the application does not have access to the source system resource to obtain the latest data for the record in question.

So what can we do to ensure that we process in the correct order and also have high availability and performance at the same time ? Please note that I my focus is only on the consumption ordering of the message in the consumer system. Regarding production of messages, ordering of placement and strategies is something I am not covering. That said, I am making an assumption that on the production end of the system messages will always be made available in order. Following on this assumption, I am taking the liberty of assuming that the JMS Time stamp uniquely identifies the messages in the order they were sent. Note that this assumption can be wrong but for keeping my examples tractable I am taking this liberty.

That said, I can see two strategies that could be employed. There maybe others that I would love to hear about as well.

1. Database Optimistic Lock based Versioning:

Every JMS Message sent has a time stamp associated with it. If the down stream database that the consumer accesses can maintain a table of RECORD_ID-LAST_UPDATE_DATE then by using Optimistic Versioning one can reject records whose time stamps are older than the one in the database. One can use Pessimistic locking as well, however for high scalable systems, Optimistic locking is definitely preferred.

With an optimistic locking solution employed, only if a record is more recent than the database would the message be processed. If all that one is doing is updating a downstream table with information after any business processing, then you might be able to tag a time stamp column to the table to serve the same purpose. Look at the figure below where both competing consumers are attempting to update the same table which has a last time stamp column for optimistic locking:


In the above shown example, let us say we are receiving updates to existing PERSON records. If M1 and M1' represents updates to the same person with M1 issued at time t and M1' issued at t + dT then, if C1 is the first one to update the Person, then all is good as C2 will update shortly and the record will have the most current value. If C2 updates the database record first and C1 tries to update, it will fail as the JMS time stamp on M1 is older than what is in the database for the Person.

I used Hibernate and HSQL to test this out. Hibernate has a feature for optimistic automatic versioning for detached objects. In order to employ the same, one has to use a Version property. The Version property cannot be set programatically but is auto-updated by Hibernate. For this reason the JMS Time stamp passed in cannot be used to version but can be used as part of the optimistic strategy.
I have a Person object that represents the database model and it looks like the following:
@Entity
@Table(name = "PERSON")
public class Person {
 @Id
 @AccessType(value = "property")
 private Long recordId;

 @Column(name = "FIRST_NAME", nullable = false)
 private String firstName;

 @Column(name = "LAST_NAME", nullable = false)
 private String lastName;

 @Temporal(TemporalType.TIMESTAMP)
 @Column(name = "JMS_TIMESTAMP", nullable = false)
 private Date jmsTimestamp;

 @Version
 @Column(name = "OPT_VER", nullable = false)
 @AccessType(value = "property")
 private Long version;
...
}

Using this code shown above, and the corresponding Data access tier, one can witness optimistic locking in action where messages are rejected or consumed as shown in the output below. Record 13 was rejected as there was a previous record in the database with a newer JMS Time stamp. Record 12 is accepted as although there is an existing record with ID 12, the time stamp on the database record is older than the JMS Time stamp of the message:
18:58:27 ERROR - com.welflex.activemq.AbstractListener.onMessage(35) | Rejecting the Stale Record [13]
com.welflex.exception.StaleObjectException: Stale Record. Database time for the record is [1266631107150], attempted to update with older timestamp [1266614643023]
at com.welflex.hibernate.dao.PersonDaoImpl.saveOrUpdate(PersonDaoImpl.java:27)
at com.welflex.service.PersonServiceImpl.savePerson(PersonServiceImpl.java:29)
at ...
18:58:28 INFO - com.welflex.activemq.AbstractListener.onMessage(32) | Successfully processed Record with ID [12]
......

2. Cache based Pessimistic Lock based Versioning:
In this strategy, I have employed Coherence from Oracle for the demonstration. The strategy employed is that when a message arrives it attempts to acquire a coherence lock for the record in question, upon obtaining the lock a check is performed to see if the corresponding date associated with the record is older than the date of the message being worked on. If the cached record is older, then the process is completed. In the case of the example the resulting operation is updating the record in the database. If the cached record in Coherence was older, then the message is rejected. In either case, the lock is release for the next record. A clear example of Pessimistic locking here. Coherence might allow for optimistic locking and transactions but I have not experimented with the same as yet. A figure demonstrating the strategy is shown below where a record is only updated to the database if the Cache lock is successfully acquired:



One thing to note with this solution, if there are many records, then one would need to be careful regarding how often the cache is expired of its entities. The decision of the same could be made if one has some idea on the frequency in which duplicates could be expected. Another point to note is that if all the node's in the cache collapse, and there were message's that were rolled back to be delivered later, unless the cache is primed or restored in some fashion, the solution could fail.

The code for the pessimistic locking in the cache is shown below:

public <T, E extends Exception> T doCacheAction(String cacheKey, Long timeStamp,
    CacheAction<T, E> action) throws E {
 try {
  idCache.lock(cacheKey, -1);
  Long cacheTime = (Long) idCache.get(cacheKey);
  if (cacheTime == null || timeStamp.longValue() > cacheTime.longValue()) {
    T result = action.doAction();
    idCache.put(cacheKey, timeStamp);
    return result;
  }
  throw new StaleObjectException("Coherence Cached Date [" + cacheTime
     + "] for Record    [" + cacheKey + "] is newer than passed in time ["
     + timeStamp + "]. Rejecting the record....");
  }
  finally {
   idCache.unlock(cacheKey);
 }
}

When an example is run, one can witness the following where Record 10 is successfully processed while record 13 (no wonder) is rejected:
19:24:35 INFO - com.welflex.activemq.AbstractListener.onMessage(32) | Record updated :10
19:24:35 ERROR - com.welflex.activemq.AbstractListener.onMessage(35) | Rejecting the Stale Record [13]
com.welflex.exception.StaleObjectException: Coherence Cached Date [1266632674247] for Record [13] is newer than passed in time [1266632673051]. Rejecting the record....
at com.welflex.service.CoherenceCacheServiceImpl.doCacheAction(CoherenceCacheServiceImpl.java:36)
at com.welflex.activemq.CacheMessageListener.doWithMessage(CacheMessageListener.java:28)
at com.welflex.activemq.AbstractListener.onMessage(AbstractListener.java:31)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
The Sample Code:
Attached herewith is a maven project that demonstrates the above concepts. The example sends messages of a fixed number of record id's to an Active MQ queue and has consumers that demonstrate optimistic or pessimistic locking to reject or accept messages. Spring, the only framework that has my undying admiration is used only for its Listener Container. The Coherence cache in the tests does not demonstrate a clustered cache but the same can be experimented with easily using the code provided. The tests will demonstrate both the cases mentioned above and should be runnable out of the box. I have taken some liberties with Hibernate, JMS and Coherence so don't bother pinning me to the stake for the same, I could care less, I have pinned myself in advance...;-)

Download the code from HERE and execute "mvn test" to see the demonstration of locking/unlocking.

Parting Thoughts:

1. Single Consumer:
The single consumer approach seems to be the most widespread pattern, whether in EJB/MDB land or otherwise for most message ordering solutions. It however is not a very scalable one. Further, if used, I believe it will have to maintain some state as well regarding previous messages received as it would need to deal with messages that were re-delivered. A posting on the Server side regarding EJB/MDB and ordering of messages.

2. The "Going back to the Source" pattern:
The pattern is acceptable clearly if you have access to the source. In terms of simplicity it is definitely the best possible solution to the problem. All the JMS Message becomes is an event notifier with a record id. I must however say that from a purely emotional perspective, it just feels ugly due to the tight coupling between the producer and consumer ends of the application. Again, note that it will not work when the consumer cannot access the source system.

3. Database based ordering:
Database for message ordering works well when you have control on the schema. If working with a legacy system, this can prove an impediment. Apart from not having a dependency on the source system, one advantage to the database approach is that due to the persistent nature of the data, surviving failures and message rollbacks will work really well which might prove a problem with a Caching solution.

4. Caching based ordering:
The caching based solution works when one does not have access to a datastore and cannot introduce database versioning. I am not certain how this will perform when the messages are varied, i.e., many records and have not experimented with cache priming or recovery.

Both in the case of the Database and the Caching based solution, the expectancy is definitely that each message contains a identifier of a record, where it is a synthetic id or natural id or a complex unique key that can differentiate it from other messages as its the identifier that is primal for the locking strategies to work.

It is also possible that message ordering is not an issue for many cases as one can always build a data integrity validating process to ensure integrity after the fact or depend on the fact that a future update will fix the data integrity issue if present. Another case maybe for manual intervention to fix the one off issues. Do however note that it does become an issue if real time information is required.

If the message updates are for inserts or creations, then ordering is hardly an issue. If they are updates of information then ordering becomes more important. In cases where incremental updates and their order needs to be preserved is important, then the above mentioned locking strategies will not work.

The book I mentioned talks for re-sequencing of messages and restoring order, I am however not convinced that performance will not be impacted by the same.

Oracle WebLogichas a message ordering solution in the form of Unit Of Order. However, from experiments we have found that for a clustered highly available environment, the solution does not work. For any questions on the same, mail or post.

As always, for each use case there is a specific solution that will work...I would love to hear about people's experiences with message ordering or just any thoughts. If however you are trying to promote your jewelery site, please don't post here :-)

7 comments:

Ian said...

Very good article. You mentioned that Weblogic's Unit of Order mechanism doesn't work in a clustered HA environment, do you mind elaborate more on why?

Many thanks
Ian

Sanjay Acharya said...

When using unit of order, messages for that unit are hashed and sent to the same physical server. So lets say we have a user with id=123, i.e., the unit of order and we put a message in a queue for the user, it gets sent to a physical server A. All updates to that user will also get sent to the same server.

If that server A goes down, then messages for the user with id=123 cannot be even sent any more without attempting a server migration or until server A comes back up.

This was the behavior we encountered. If things have changed since then or we were incorrect, I'd love to know :-)

Anonymous said...

Weblogic provides path service for message ordering. it is better than Hash.

Nic

Anonymous said...

Very good article thank you ! however the source code link is not working giving 404, can you please fix it

Anonymous said...

Great article thank you, link to sourcode is broken can you please fix it

Sanjay Acharya said...

Sorry, for the 404. Has been fixed.

Anonymous said...

The briefly mentioned and discarded WebLogic JMS Unit of Order option neatly solves the strictly ordered redelivery problem with very little additional coding -- simply mark each message with a UOO name, and WL JMS will guarantee FIFO for messages with the same UOO name, while allowing parallel processing of messages with different UOO names.

The clustered JMS high availability issue mentioned above where a UOO send may fail even if a single JMS Server itself fails is very often addressable via additional tuning -- WebLogic JMS provides 'automatic service migration/restart' and 'whole server migration/restart' for this purpose. The downside is that an automatic service restart may take several seconds to detect and recover from a failure. See https://docs.oracle.com/middleware/12213/wls/JMSAD/best_practice.htm for details.