Search This Blog

Showing posts with label JMS Patterns. Show all posts
Showing posts with label JMS Patterns. Show all posts

Wednesday, December 28, 2011

WebLogic JMS Partitioned Distributed Topics and Shared Subscriptions

I had previously blogged about the challenges of scaling topics with durable subscribers. In JMS, with topics, one can typically have a single topic subscriber for a particular [Client ID, Subscription Name] tuple.

In the example shown below the Order Topic has two durable subscribers, the Order Processor and the Auditor. Both of these get the message sent to the topic but at any give time, there can be exactly one of each consuming the message. There cannot be completing durable subscriptions as in the case of queues. The same translates to a scalability and availability challenge:
Oracle WebLogic has introduced the concept of a Partitioned distributed topic and shared connection subscriptions that will allow for more than one durable to subscription to be made available for a particular subscriber. In the figure shown below, with a shared subscription, one can have consumers of an "application" compete for messages of a Topic and this feature promotes scalability and availability. In the figure shown below there are two Order Processors which are competing for messages and so are the two Auditors. At no given time will a message be delivered to both the consumers of the same "application":

WebLogic introduces the concept of a UNRESTRICTED Client ID policy. With this setting on a topic, more than one connection in a cluster can share the same Client ID. The standard option of RESTRICTED enforces that only a single connection with a particular Client ID can exist in a cluster.

Sharing Subscriptions:
There are two policies for Subscription sharing:

1. Exclusive - This is the default and all subscribers created using the connection factory cannot share subscriptions with any other subscribers.

2. Sharable - Subscribers created using this connection factory can share their subscriptions with other subscribers. Consumers can share a durable subscription if they have the same [Client ID, Client ID policy and subscription name].

To promote HA, set the Subscription on the Topic as Shareable.

When creating a Topic in Weblogic set the Forwarding Policy to be Partitioned. This causes a message sent to a partitioned distributed topic to be sent to a single physical member. In addition, the message will not be forwarded to other members of the cluster if there are no consumers in the current physical member.

If we want a HA solution, then the listeners will need to connect to each and every physical member across the cluster. Consider the following figure, on Consumer Machine 1, there are two consumers of Type A and two Consumers of Type B, the same applies to the Consumer Machine 2. It is important to note that consumers of a type or application connect to both the physical members of the cluster. Doing so ensures that in the event a consumer machine dies unexpectedly, the other consumer machine can still continue to function ensuring availability:


The above can be achieved using Spring Framework's Message Listener Container with some wrapper code as shown below where the PartitionedTopicContainer is a container of containers connecting to each physical member of the topic with the same client ID and subscription name:
public class PartitionedTopicContainer {
  private final List<DefaultMessageListenerContainer> containers;

  public PartitionedTopicContainer(String clientID, String subscriptionName, ConnectionFactory connectionFactory, Destination ...physicalTopicMembers) {
    this.containers = Lists.newArrayList();

    for (Destination physicalMember : physicalTopicMembers) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

      container.setConnectionFactory(connectionFactory);
      container.setDestination(physicalMember);
      container.setClientId(clientID);
      container.setDurableSubscriptionName(subscriptionName);
    }
  }

  public void setMessageListener(Object listener) {
    for (DefaultMessageListenerContainer container : containers) {
      container.setMessageListener(listener);
    }
  }

  public void start() {
    for (DefaultMessageListenerContainer container : containers) {
      container.start();
    }
  }

  public void shutdown() {
    for (DefaultMessageListenerContainer container : containers) {
      container.shutdown();
    }
  }
}
The above container could then be used as follows:
// Obtain each physical member of the partitioned distributed topic
Destination physicalMember1 = (Destination) context.lookup("Server1@orderTopic");
Destination physicalMember2 = (Destination) context.lookup("Server2@orderTopic");

// Container for the Order Processor
PartitionedTopicContainer subscriber1 = new PartitionedTopicContainer("orderConnectionId", "orderProcSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber1.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber order processor got a message...");
    ...
  }
});
// Container for the Auditor
PartitionedTopicContainer subscriber2 = new PartitionedTopicContainer("auditorConnectionId", "auditorSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber2.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber auditor got a message...");
    ...
  }
});

subscriber1.start();
subscriber2.start();


The parts of the code above where a consumer of the API has to look up each and every physical member and provide the same to  the container is a lot of boiler plate and does not account well for cases when a physical member becomes available/unavailable. Luckily, WebLogic provides the JmsDestinationAvailabilityHelper API which is a way to listen to events relating to physical  member availability and unavailability. The PartitionedTopicContainer shown above could easily be augmented with the availability helper API and get notified of physical destination availability and unavailability to correspondingly start and stop the internal container to the physical destination. Psuedo-code of how this can be achieved with the above container is shown below:

public class PartitionedTopicContainer implements DestinationAvailabilityListener {
  private final String partDistTopicJndi;

  private final ConnectionFactory connectionFactory;
   
  @GuardedBy("containerLock")
  private final Map<String, DefaultMessageListenerContainer> containerMap;

  private final Object containerLock = new Object();

  // WebLogic Handle
  private RegistrationHandle registrationHandle;

  private final CountdownLatch startLatch = new CountdownLatch(1);

  public PartitionedTopicContainer(String clientID, String subscriptionName, String clusterUrl,
    ConnectionFactory connectionFactory, String partDistTopicJndi) {
    this.clusterUrl = clusterUrl;
    this.clientID = clientID;
    this.subscriptionName = subscriptionName;
    this.partDistTopicJndi = partDistTopicJndi;
    this.containerMap = Maps.newHashMap();
    this.connectionFactory = connectionFactory;
  }

  public void start() throws InterruptedException {
    Hashtable<String, String> jndiProperties = ...;
    jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
    jndiProperties.put(Context.PROVIDER_URL, clusterUrl);
    
    JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
   
    // Register this container as a listener for destination events
    registrationHandle = dah.register(jndiProperties, partDistTopicJndi, this);
   
    // Wait for listener notification to start container
    startLatch.await();
  }

  @Override
  public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
    synchronized (containerLock) {
      // For all Physical destinations, start a container
      for (DestinationDetail detail : physicalAvailableMembers) {
        Destination physicalMember = lookupPhysicalTopic(detail.getJNDIName());
        DefaultMessageListener container = new DefaultMessageListenerContainer();
        
        container.setConnectionFactory(connectionFactory);
        container.setDestination(physicalMember);
        container.setClientId(clientID);
        container.setDurableSubscriptionName(subscriptionName);
        System.out.println("Starting Container for physical Destination:" + detail);
        container.start();
        containerMap.put(detail.getJNDIName(), container);
      }
    }
    startLatch.countdown();       
  }
  
  @Override
  public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
    synchronized (containerLock) {
      // Shutdown all containers whose physical members are no longer available
      for (DestinationDetail detail : physicalUnavailableMembers) {
        DefaultMessageListenerContainer container = containerMap.remove(detail.getJNDIName());
        container.shutdown();
      }
    }
  }

  @Override
  public void onFailure(String destJndiName, Exception exception) {
    // Looks like a cluster wide failure
    shutdown();
  }

  public void shutdown() {
    // Unregister for events about destination availability
    registrationHandler.unregister();

    // Shut down containers
    synchronized (containerLock) {
       for (Iterator<Map.Entry<String, DefaultMessageListenerContainer>> i = containerMap.entrySet().iterator(); i
          .hasNext();) {
        Map.Entry<String, DefaultMessageListenerContainer> entry = i.next();
        System.out.println("Shutting down container for:" + entry.getKey());
        entry.getValue().shutdown();
        i.remove();
      }
    }  
  }
}   

Some of the things to remember when creating a partitioned distributed topic and shared subscription:

1. Connection Factory being used should have "Subscription Sharing Policy" set as "Shareable"
2. Forwarding policy on the Partitioned Distributed Topic should be set as "Partitioned"
3. Message forwarding will not occur, so subscribers must ensure connections exist to every physical member else messages can pile up for the subscription on that topic
4. If a server hosting a physical member is unavailable then messages from that physical topic will be unavailable until server is made available.

Partitioned Distributed Topics and Shared Subscriptions looks promising. One thing I need to sort out is how does one handle error destinations on a per subscription level with WebLogic. Any passer by with thoughts, please do shoot it my way.

Tuesday, June 23, 2009

Scaling in the JMS World...in particular Topics

Scalability and Availability in JMS. For the former, as load on the messaging system increases, one would want to be able to service the increased messages without performance degradation. For the latter, one should ensure the reduction if not elimination of single point of failures, i.e., ensuring system is available for use.

Most JMS Providers have some sort of mechanism for addressing the above. This BLOG is however only going to concentrate on Oracle WebLogic.

WebLogic has an interesting way in which JMS Destinations (Queues and Topics) are scaled via Distributed or Virtual Destinations. A Virtual Destination can be considered a sort of router/load balancer that distributes messages it receives over to actual or physical destinations. Each physical destination would reside in separate WebLogic instances on potentially separate hardware. See figure below where a virtual destination "test_queue" has physical members test_queue_1, test_queue_2 and test_queue_3 on separate WebLogic instances:



As an example of Virtual Queues in action, in the conceptual figure shown below, Producers send messages to the Virtual Queue which then directs the message to one of the physical destinations



P : Producers
VQ: Virtual Queue
PQ-X: Physical Queue, where X is the number of the Queue
C: Consumer
M: Message

In the case of a Virtual Topic, messages sent to the Virtual Topic are sprayed on to each and every physical member as shown in the figure below where VT and PT-X represent Virutal and Physical topics respectively:


Scaling Queues:
From the figure shown regarding Queues, it is pretty simple to follow how the system scales and that it will continue to service new messages and consume them should one of the servers in the messaging cluster fails. A message will be consumed at most by one of many consumers successfully. Adding Consumers to the end of the physical members allow the architecture to process messages rapidly, i.e., Competing Consumer Design Pattern.

Now, what if one of the servers having physical members went down, for example VQ-1 went down? From the perspective of availability, messages are still flowing and getting consumed from the other available member, i.e., VQ-2. What about the messages that were in the physical member whose server went down? One has multiple options. The first and most straight forward is to restart the server and re-connect the consumers to drain the queue. WebLogic allows for auto-service migration in the event of failure. This is another feature that can assist in recovering from failure. Queues readily tend to present themselves for scaling. One note on the same, if message ordering is put into play, all bets are off :-)..more on the same on a subsequent blog.

Scaling Topics:
Topics unlike Queues present an interesting challenge. In the case of a topic, a message is delivered to all listeners of the topic and each listener is typically different than its peer in how it handles the message. For example a message posted to an order topic, might have a consumer that processes the order while another one looks at inventory calculations:


Clearly we cannot have two consumers that process the same order and it is redundant to have duplicate consumers perform the same inventory calculations.

Messaging systems have the concept of a Durable Subscriber, i.e., a Message listener who registers with the topic such that they are delivered messages sent to the topic even when the listener is not connected. Message providers store the message on behalf of the durable subscriber and forward the same when the subscriber comes back on line.

In WebLogic one cannot connect to a durable subscriber using a distributed (virtual) destination JNDI. One has to explicitly register with a physical member. This is one gripe that I have with the WebLogic solution. Adding to the problem, should one connect to one physical member and if that member goes down, attempting to establish the durable subscription on other available members are treated like totally new subscriptions. In consolation, if the server goes down that contains durable subscriptions, then messages will be stored on other physical members of the virtual topic on behalf of the durable subscribers, and when the server is restored, messages get forwarded to the durable subscribers when they comes back on line as shown below:


Now, as each subscriber of the topic has a unique agenda from the other subscribers, the fact that it is the sole entity that services the message, the same represents a single point of failure and a proves a scalability inhibitor as it cannot concurrently process multiple messages.

So how can one scale a topic so that a message on a given type of subscriber can be rapidly processed? One direction is have the consumer of a Topic post the message onto a Queue which can then be processed by competing consumers. One way this can be achieved is to have a Message Driven Bean withing the container whose sole purpose is to transfer the message from the topic to a queue for those consumers that need to scale. An example of the same is shown below:


In the above shown example, as one consumer of the topic needs processing to rapidly occur of messages received, an MDB (registered as a durable subscriber of the topic) is used to transfer every message onto a Queue which is then consumed by competing consumers. As other consumers, DC-1 and DC-2 are not in need of scaling, they remain durable single subscribers.

The above solution works to scale the durable subscriber of a topic. However, in WebLogic when using distributed topics the durable MDB still represents a single point of failure.

A question to ask at this point is, "Do you really need a topic for the use case at hand?" Topics are great IMO when the subscribers tend to be dynamic in nature or do not need to be durable, an example of the former is a stock quote topic and an example of the latter is a event listener that requires notification of an event.

When the number of subscribers are a known or finite in amount, the use of a queue coupled with the MDB can easily achieve a scalable solution as shown below where the MDB transfers messages on the in-queue to subsequent out-queues where competing consumers can scale and consume the messages:



Transfer MDB:
It is pretty trivial to create the Transfer MDB. One would typically like to re-use the Transfer MDB Java Code with only configuration changes for different applications. The same can easily achieved via separate projects where target destination can be provided via @Resource. A simple transfer MDB is shown below:



@MessageDriven
@TransactionAttribute(TransactionAttributeType.REQUIRED)

public class MessageRouterMdb implements MessageListener {

/**
* Map this jms connection factory to the actual one in the weblogic-ejb-jar.xml deployment
* descriptor of the ejb project. The Connection factory to be used will be the one that is
* specific to this mdb.
*/

@Resource(name = "jms/connectionFactory")

private ConnectionFactory connectionFactory;

/**
* Injected in Message Driven Context
*/

@Resource
private MessageDrivenContext mdc;

/**
* Define a comma separated list of destinations JNDI names in the ejb-jar.xml via an environment
* entry.
*/

@Resource
private String targetDestinations;

// Initialized
private List<Destination> targetJmsDestinations;

private MessageProducer messageProducer;

private Connection connection;

private Session session;

@PostConstruct
public void init() {
try {

InitialContext ctx = new InitialContext();
connection = connectionFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

messageProducer = session.createProducer(null);
String[] targetDestArray = StringUtils.split(targetDestinations, ",");

targetJmsDestinations = new ArrayList<Destination>();

for (String targetDest : targetDestArray) {

targetJmsDestinations.add((Destination) ctx.lookup(targetDest));

}

connection.start();
}
catch (Exception e) {
throw new RuntimeException("Error Initializing Message Router Mdb", e);

}
}

public void onMessage(Message message) {

try {
// Loop through target destinations and send message to each destination
for (Destination dest : targetJmsDestinations) {

messageProducer.send(dest, message);
}
}

catch (Exception e) {
mdc.setRollbackOnly();

}
}

@PreDestroy
public void cleanup() {

// Close open jms resources here...
}
}




The CODE for the above MDB and an example Maven Project that uses WebLogic is available for download HERE. Note that the code is not specific to WebLogic and can really be used in any container, for example JBoss.

Conclusion:

Scaling topics via an MDB and target queues just felt like an obvious pattern to me. WebLogic, like many JMS Providers has the concept of a Message Bridge. Initial investigation led me to believe that a message bridge more serves the purpose of transferring a message from one provider to to another rather than serving to scale a Topic and distributed messages to multiple destinations.

There may be cases as mentioned where the number of subscribers are not known. In such a case, a topic might work and it becomes the responsibility of the subscribers to scale on their end, for example multi-threading upon receiving the message.

What really would be nice if the JMS specification had the concept of "Durable Competing Consumers" where a Durable subscriber could register multiple instances of itself such that a message to a topic would be delivered to at most it or one of its clones :-). I believe there are some Messaging providers who permit such a pattern via customization that extend the JMS specifications.

Before bidding adieu, this has been one of my most figurative blogs, all Ye artists, look at the diagrams with a forgiving eye, "I am a programmer not an artist!" :-)