Search This Blog

Tuesday, June 23, 2009

Scaling in the JMS World...in particular Topics

Scalability and Availability in JMS. For the former, as load on the messaging system increases, one would want to be able to service the increased messages without performance degradation. For the latter, one should ensure the reduction if not elimination of single point of failures, i.e., ensuring system is available for use.

Most JMS Providers have some sort of mechanism for addressing the above. This BLOG is however only going to concentrate on Oracle WebLogic.

WebLogic has an interesting way in which JMS Destinations (Queues and Topics) are scaled via Distributed or Virtual Destinations. A Virtual Destination can be considered a sort of router/load balancer that distributes messages it receives over to actual or physical destinations. Each physical destination would reside in separate WebLogic instances on potentially separate hardware. See figure below where a virtual destination "test_queue" has physical members test_queue_1, test_queue_2 and test_queue_3 on separate WebLogic instances:



As an example of Virtual Queues in action, in the conceptual figure shown below, Producers send messages to the Virtual Queue which then directs the message to one of the physical destinations



P : Producers
VQ: Virtual Queue
PQ-X: Physical Queue, where X is the number of the Queue
C: Consumer
M: Message

In the case of a Virtual Topic, messages sent to the Virtual Topic are sprayed on to each and every physical member as shown in the figure below where VT and PT-X represent Virutal and Physical topics respectively:


Scaling Queues:
From the figure shown regarding Queues, it is pretty simple to follow how the system scales and that it will continue to service new messages and consume them should one of the servers in the messaging cluster fails. A message will be consumed at most by one of many consumers successfully. Adding Consumers to the end of the physical members allow the architecture to process messages rapidly, i.e., Competing Consumer Design Pattern.

Now, what if one of the servers having physical members went down, for example VQ-1 went down? From the perspective of availability, messages are still flowing and getting consumed from the other available member, i.e., VQ-2. What about the messages that were in the physical member whose server went down? One has multiple options. The first and most straight forward is to restart the server and re-connect the consumers to drain the queue. WebLogic allows for auto-service migration in the event of failure. This is another feature that can assist in recovering from failure. Queues readily tend to present themselves for scaling. One note on the same, if message ordering is put into play, all bets are off :-)..more on the same on a subsequent blog.

Scaling Topics:
Topics unlike Queues present an interesting challenge. In the case of a topic, a message is delivered to all listeners of the topic and each listener is typically different than its peer in how it handles the message. For example a message posted to an order topic, might have a consumer that processes the order while another one looks at inventory calculations:


Clearly we cannot have two consumers that process the same order and it is redundant to have duplicate consumers perform the same inventory calculations.

Messaging systems have the concept of a Durable Subscriber, i.e., a Message listener who registers with the topic such that they are delivered messages sent to the topic even when the listener is not connected. Message providers store the message on behalf of the durable subscriber and forward the same when the subscriber comes back on line.

In WebLogic one cannot connect to a durable subscriber using a distributed (virtual) destination JNDI. One has to explicitly register with a physical member. This is one gripe that I have with the WebLogic solution. Adding to the problem, should one connect to one physical member and if that member goes down, attempting to establish the durable subscription on other available members are treated like totally new subscriptions. In consolation, if the server goes down that contains durable subscriptions, then messages will be stored on other physical members of the virtual topic on behalf of the durable subscribers, and when the server is restored, messages get forwarded to the durable subscribers when they comes back on line as shown below:


Now, as each subscriber of the topic has a unique agenda from the other subscribers, the fact that it is the sole entity that services the message, the same represents a single point of failure and a proves a scalability inhibitor as it cannot concurrently process multiple messages.

So how can one scale a topic so that a message on a given type of subscriber can be rapidly processed? One direction is have the consumer of a Topic post the message onto a Queue which can then be processed by competing consumers. One way this can be achieved is to have a Message Driven Bean withing the container whose sole purpose is to transfer the message from the topic to a queue for those consumers that need to scale. An example of the same is shown below:


In the above shown example, as one consumer of the topic needs processing to rapidly occur of messages received, an MDB (registered as a durable subscriber of the topic) is used to transfer every message onto a Queue which is then consumed by competing consumers. As other consumers, DC-1 and DC-2 are not in need of scaling, they remain durable single subscribers.

The above solution works to scale the durable subscriber of a topic. However, in WebLogic when using distributed topics the durable MDB still represents a single point of failure.

A question to ask at this point is, "Do you really need a topic for the use case at hand?" Topics are great IMO when the subscribers tend to be dynamic in nature or do not need to be durable, an example of the former is a stock quote topic and an example of the latter is a event listener that requires notification of an event.

When the number of subscribers are a known or finite in amount, the use of a queue coupled with the MDB can easily achieve a scalable solution as shown below where the MDB transfers messages on the in-queue to subsequent out-queues where competing consumers can scale and consume the messages:



Transfer MDB:
It is pretty trivial to create the Transfer MDB. One would typically like to re-use the Transfer MDB Java Code with only configuration changes for different applications. The same can easily achieved via separate projects where target destination can be provided via @Resource. A simple transfer MDB is shown below:



@MessageDriven
@TransactionAttribute(TransactionAttributeType.REQUIRED)

public class MessageRouterMdb implements MessageListener {

/**
* Map this jms connection factory to the actual one in the weblogic-ejb-jar.xml deployment
* descriptor of the ejb project. The Connection factory to be used will be the one that is
* specific to this mdb.
*/

@Resource(name = "jms/connectionFactory")

private ConnectionFactory connectionFactory;

/**
* Injected in Message Driven Context
*/

@Resource
private MessageDrivenContext mdc;

/**
* Define a comma separated list of destinations JNDI names in the ejb-jar.xml via an environment
* entry.
*/

@Resource
private String targetDestinations;

// Initialized
private List<Destination> targetJmsDestinations;

private MessageProducer messageProducer;

private Connection connection;

private Session session;

@PostConstruct
public void init() {
try {

InitialContext ctx = new InitialContext();
connection = connectionFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

messageProducer = session.createProducer(null);
String[] targetDestArray = StringUtils.split(targetDestinations, ",");

targetJmsDestinations = new ArrayList<Destination>();

for (String targetDest : targetDestArray) {

targetJmsDestinations.add((Destination) ctx.lookup(targetDest));

}

connection.start();
}
catch (Exception e) {
throw new RuntimeException("Error Initializing Message Router Mdb", e);

}
}

public void onMessage(Message message) {

try {
// Loop through target destinations and send message to each destination
for (Destination dest : targetJmsDestinations) {

messageProducer.send(dest, message);
}
}

catch (Exception e) {
mdc.setRollbackOnly();

}
}

@PreDestroy
public void cleanup() {

// Close open jms resources here...
}
}




The CODE for the above MDB and an example Maven Project that uses WebLogic is available for download HERE. Note that the code is not specific to WebLogic and can really be used in any container, for example JBoss.

Conclusion:

Scaling topics via an MDB and target queues just felt like an obvious pattern to me. WebLogic, like many JMS Providers has the concept of a Message Bridge. Initial investigation led me to believe that a message bridge more serves the purpose of transferring a message from one provider to to another rather than serving to scale a Topic and distributed messages to multiple destinations.

There may be cases as mentioned where the number of subscribers are not known. In such a case, a topic might work and it becomes the responsibility of the subscribers to scale on their end, for example multi-threading upon receiving the message.

What really would be nice if the JMS specification had the concept of "Durable Competing Consumers" where a Durable subscriber could register multiple instances of itself such that a message to a topic would be delivered to at most it or one of its clones :-). I believe there are some Messaging providers who permit such a pattern via customization that extend the JMS specifications.

Before bidding adieu, this has been one of my most figurative blogs, all Ye artists, look at the diagrams with a forgiving eye, "I am a programmer not an artist!" :-)