Search This Blog

Tuesday, January 3, 2012

Apache ZooKeeper - A maven example

Let me begin with - Happy New Year! I am starting this years BLOG entry with some playing I did over the holidays with Apache ZooKeeper. So what is Apache ZooKeeper?  Broadly put, ZooKeeper provides a service for maintaining distributed naming, configuration and group membership. Zookeeper provides a centralized coordination service which is distributed, consistent and highly available. In particular, it specializes in coordination tasks such as leader election, status propagation and rendezvous. Quoting the ZooKeeper documentation, " The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch."

Zookeeper facilitates the coordination of distributed systems via a hierarchical name space called ZNodes. A ZNode can have children Znodes as well as have data associated with it (figure linked from official ZooKeeper documentation):

ZooKeeper also has the concept of Nodes and Ephermeral Nodes. The former survives the death of the client that created it while the latter does not. Nodes can also be SEQUENTIAL where a SEQUENCE number is associated with Node. Sequences clearly lend themselves for queue type or ordered behavior when required. ZNodes cannot be deleted as long as they have children under it.

The Zookeeper API itself it a very simple programming interface that contains methods for CRUD of a ZNode and data along with a way to listen for changes to the node and/or its data.

One can read quite a bit about Zookeeper from their documentation and I will not delve into details. My goal in this BLOG is to provide some simple examples of how Zookeeper can be used for:

1. Leader Election
2. Rendezvous or Barrier

1. Leader Election Example:

As mentioned previously, Zoo Keeper specializes in  Leader Election, where a particular server of a type is elected the leader and upon its demise, another stands up to take its place. The concept of Ephemeral nodes is pretty useful in such a scenario where a service upon start up registers itself as a candidate in the pool of resources to be used. For the sake of discussion, consider an Echo Service with the fictional constraint that there can be only one Echo Service at any given time servicing requests but should that robust service die, another one is ready to resume the responsibility of servicing clients. The EchoService on start up registers an ephemeral node at "/echo/n_000000000X". It further registers a watcher on its predecessor Echo Service, denoted by the ZNode,  "/echo/n_000000000X-1", with the intent that should the predecessor die, then the node in question can assume leadership if it is the logical successor as shown in the picture below:
Leader Election of Echo Server



The reason for registering a watch on the predecessor versus the root of "/echo" is to prevent herding as described in the Zookeeper recipes. In other words in the event of  a leader being decommisioned, we wouldn't want all the Echo Server leadership contenders to stress the system by requesting ZooKepper for the children of the "/echo" node to determine the next leader. With the chosen strategy, if the predecessor dies, then only its follower is the one that executes the call to get the children of "/echo". The example provided herewith spawns two Echo Servers and has a client that calls the server. One of the two servers is considered the leader and assumes the responsibility of servicing the requests of the client. If the leader dies, then the successor Echo Server assumes leadership and starts servicing client requests. The client will connect to the next leader automatically upon the death of the current leader. The Echo Server code is shown below:
public class EchoServer extends Thread implements IZkDataListener, IZkStateListener {
  private ZkClient zkClient;
  // My ZNode
  private SequentialZkNode zkNode;

  // My leader ZNode
  private SequentialZkNode zkLeaderNode;
  private ServerSocket serverSocket;
  
  // Port to start server on
  private final Integer port;
  private final List<SlaveJob> slaves;
  private final Object mutex = new Object();
  private final AtomicBoolean start = new AtomicBoolean(false);
  private final Random random = new Random();

  public EchoServer(int port, int zkPort) {
    this.port = port;
    this.zkClient = new ZkClient("localhost:" + zkPort);
    this.slaves = new ArrayList<SlaveJob>();
  }

  @Override
  public void run() {
    try {
      // Sleep for some randomness leader selection
      Thread.sleep(random.nextInt(1000));

      // Create Ephermal node
      zkNode = ZkUtils.createEphermalNode("/echo", port, zkClient);

      // Find all children
      NavigableSet<SequentialZkNode> nodes = ZkUtils.getNodes("/echo", zkClient);

      // Find my leader
      zkLeaderNode = ZkUtils.findLeaderOfNode(nodes, zkNode);

      // If I am leader, enable me to start
      if (zkLeaderNode.getPath().equals(zkNode.getPath())) {
        start.set(true);
      } else {
        // Set a watch on next leader path
        zkClient.subscribeDataChanges(zkLeaderNode.getPath(), this);
        zkClient.subscribeStateChanges(this);
      }

      synchronized (mutex) {
        while (!start.get()) {
          LOG.info("Server on Port:" + port + " waiting to spawn socket....");
          mutex.wait();
        }
      }
      // Start accepting connections and servicing requests 
      this.serverSocket = new ServerSocket(port);

      Socket clientSocket = null;
      
      while (!Thread.currentThread().isInterrupted() && !shutdown) {
        clientSocket = serverSocket.accept();
        // ... start slave job
      }
    }
    catch (Exception e) {
     ....
    }
  }

  private void electNewLeader() {
    final NavigableSet<SequentialZkNode> nodes = ZkUtils.getNodes("/echo", zkClient);

    if (!zkClient.exists(zkLeaderNode.getPath())) {
      // My Leader does not exist, find the next leader above
      zkLeaderNode = ZkUtils.findLeaderOfNode(nodes, zkNode);
      zkClient.subscribeDataChanges(zkLeaderNode.getPath(), this);
      zkClient.subscribeStateChanges(this);
    }

    // If I am the leader then start
    if (zkNode.getSequence().equals(nodes.first().getSequence())) {
      LOG.info("Server on port:" + port + "  will now be notified to assume leadership");
      synchronized (mutex) {
        start.set(true);
        mutex.notify();
      }
    }
  }
  .....

  @Override
  public void handleDataDeleted(String dataPath) throws Exception {
    if (dataPath.equals(zkLeaderNode.getPath())) {
      // Leader gone away
      LOG.info("Recieved a notification that Leader on path:" + dataPath
        + " has gone away..electing new leader");
      electNewLeader();
    }
  }
}

The Echo Client is shown below which connects to the leader of the Echo Server ensemble:
public class EchoClient implements IZkDataListener, IZkStateListener, IZkChildListener {
  private Connection connection;
  private Object mutex = new Object();
  private final ZkClient zkClient;
  private SequentialZkNode connectedToNode;

  public EchoClient(int zkPort) {
    this.zkClient = new ZkClient("localhost:" + zkPort);
  }

  private Connection getConnection() throws UnknownHostException, IOException, InterruptedException {
    synchronized (mutex) {
      if (connection != null) {
        return connection;
      }

      NavigableSet<SequentialZkNode> nodes = null;
      
      // Check to see if there an Echo server exists, if not listen on /echo for 
      // a Server to become available 
      while ((nodes = ZkUtils.getNodes("/echo", zkClient)).size() == 0) {
        LOG.info("No echo service nodes ...waiting...");
        zkClient.subscribeChildChanges("/echo", this);
        mutex.wait();
      }

      // Get the leader and connect
      connectedToNode = nodes.first();
      Integer port = zkClient.readData(connectedToNode.getPath());
      connection = new Connection(new Socket("localhost", port));
      
      // Subscribe to Server changes
      zkClient.subscribeDataChanges(connectedToNode.getPath(), this);
      zkClient.subscribeStateChanges(this);
    }

    return connection;
  }

  public String echo(String command) throws InterruptedException {
    while (true) {
      try {
        return getConnection().send(command);
      }
      catch (InterruptedException e) {
        throw e;
      }
      catch (Exception e) {
        synchronized (mutex) {
          if (connection != null) {
            connection.close();
            connection = null;
          }
        }
      }
    }
  }

  private static class Connection { 
    // Code that connects and sends data to Echo Server
    public String send(String message) {
      ....
    }
  }

  @Override
  public void handleDataDeleted(String dataPath) throws Exception {
    synchronized (mutex) {
      if (dataPath.equals(connectedToNode.getPath())) {
        LOG.info("Client Received notification that Server has died..notifying to re-connect");
        if (connection != null) {
          connection.close();
        }
        if (connectedToNode != null) {
          connectedToNode = null;
        }
      }
      mutex.notify();
    }
  }

  @Override
  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
    synchronized (mutex) {
      LOG.info("Got a notification about a Service coming up...notifying client");
      mutex.notify();
    }
  }
}

A simple test of the example starts an EchoClient and two servers. One of the servers assumes leadership and starts servicing request while the other server sits waiting to assume leadership. The client in turn sends messages which is serviced by the leader. When the leader terminates, messages are then serviced by the successor that assumes leadership.
  @Test
  public void simpleLeadership() throws InterruptedException {

    EchoClient client = new EchoClient(ZK_PORT);
    EchoServer serverA = new EchoServer(5555, ZK_PORT);
    EchoServer serverB = new EchoServer(5556, ZK_PORT);

    serverA.start();
    serverB.start();
 
    // Send messages
    for (int i = 0; i < 10; i++) {
      System.out.println("Client Sending:Hello-" + i);
      System.out.println(client.echo("Hello-" + i));
    }
    
    if (serverA.isLeader()) {
      serverA.shutdown();
    } else {
      serverB.shutdown();
    }
 
    for (int i = 0; i < 10; i++) {
      System.out.println("Client Sending:Hello-" + i);
      System.out.println(client.echo("Hello-" + i));
    }

    serverA.shutdown();
    serverB.shutdown();
  }
On running the test, you can see output similar to:
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(85) | Server on Port:5556 is now the Leader. Starting to accept connections...
Client Sending:Hello-0
Echo:Hello-0
Client Sending:Hello-1
Echo:Hello-1
.....
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(122) | Server on Port:5556 has been shutdown
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
Server has died..notifying to re-connect
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(85) | Server on Port:5555 is now the Leader. Starting to accept connections...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
....
Echo:Hello-7
Client Sending:Hello-8
Echo:Hello-8
Client Sending:Hello-9
Echo:Hello-9
....

2. Rendezvous/Barrier Example: 

Distributed systems can use the concept of a barrier to block the processing of a certain task until other members of the system ensemble are available after which all the systems participating can proceed with their tasks. Having a little fun here :-) with a fictional case where Double O agents of MI-6 meet to present their reports, and reports are not edited here if you know what I mean ;-). Agents join the meeting but cannot start presenting until all the expected agents join the meeting after which they are free to present in parallel (agents are designed to grasp from multiple presenters; after all the 007 variety are trained at such things). Agents can present but cannot leave the briefing until all presenters have completed their presentation. The agent upon start up creates a ZNode under the meeting identifier, say, "/M-debrief", and then waits for all other agents to join before beginning their presentation, i.e., barrier entry. After each presentation, the agent removes themselves from the list of ZNodes and then waits for other agents to do the same before leaving the briefing, i.e., barrier exit. With that said, the Agent code looks like:
public class Agent extends Thread implements IZkChildListener {
  // Number of agents total
  private final int agentCount;

  private final String agentNumber;

  private final Object mutex = new Object();

  private final Random random = new Random();

  private SequentialZkNode agentRegistration;
  
  private final String meetingId;

  public Agent(String agentNumber, String meetingId, int agentCount, int zkPort) {
    this.agentNumber = agentNumber;
    this.meetingId = meetingId;
    this.agentCount = agentCount;
    this.zkClient = new ZkClient("localhost:" + zkPort);
    zkClient.subscribeChildChanges(meetingId, this);
  }

  private void joinBriefing() throws InterruptedException {
    Thread.sleep(random.nextInt(1000));
    agentRegistration = ZkUtils.createEpheremalNode(meetingId, agentNumber, zkClient);
    LOG.info("Agent:" + agentNumber + " joined Briefing:" + meetingId);
    
    // Wait for all other agents to join the meeting
    while (true) {
      synchronized (mutex) {
        List<String> list = zkClient.getChildren(meetingId);

        if (list.size() < agentCount) {
          LOG.info("Agent:" + agentNumber + " waiting for other agents to join before presenting report...");
          mutex.wait();
        }
        else {
          break;
        }
      }
    }
  }

  private void presentReport() throws InterruptedException {
    LOG.info("Agent:" + agentNumber + " presenting report...");
    Thread.sleep(random.nextInt(1000));
    LOG.info("Agent:" + agentNumber + " completed report...");

    // Completion of a report is identified by deleting their ZNode
    zkClient.delete(agentRegistration.getPath());
  }

  private void leaveBriefing() throws InterruptedException {

    // Wait for all agents to complete
    while (true) {
      synchronized (mutex) {
        List<String> list = zkClient.getChildren(meetingId);
        if (list.size() > 0) {
          LOG.info("Agent:" + agentNumber
            + " waiting for other agents to complete their briefings...");
          mutex.wait();
        }
        else {
          break;
        }
      }
    }
    LOG.info("Agent:" + agentNumber + " left briefing");
  }

  @Override
  public void run() {
    joinBriefing();
    presentReport();
    leaveBriefing();
  }

  @Override
  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
    synchronized (mutex) {
      mutex.notifyAll();
    }
  }
}
Running the test, one would see something like the following where agents join the meeting, wait for the others before presenting, present at the meeting and then leave after everyone completes. Note that output is edited in the interest of real estate:
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:003 joined Briefing:/M-debrief
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:003 waiting for other agents to join before presenting report...
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:005 joined Briefing:/M-debrief
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:005 waiting for other agents to join before 
.....
:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:005 waiting for other agents to join before presenting report...
......
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:006 waiting for other agents to join before presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:007 joined Briefing:/M-debrief
21:02:18 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:007 waiting for other agents to join before presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:002 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:005 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:007 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:003 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:001 presenting report...
....
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(59) | Agent:000 completed report...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:000 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:000 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(59) | Agent:007 completed report...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:007 waiting for other agents to complete their briefings...
....
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:007 left briefing
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:003 left briefing
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:002 left briefing
...
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:000 left briefing

Running the code:

Download a maven example of the above code from HERE. Execute a "mvn test" to see the example in action or import into eclipse to dissect the code. The example itself runs its tests on a single ZooKeeper server. There is no reason why the same cannot be expanded to try on a larger ZooKeeper ensemble.

Conclusion and Credits:

The examples shown are highly influenced by the examples provided at the ZooKeeper site. The code does not use the ZooKeeper code directly but instead uses a wrapper around the same, i.e., ZkClient, which provides a much easier experience to using ZooKeeper than the default ZooKeeper API. The examples also rely heavily on the excellent article and supporting code by Erez Mazor who utilizes Spring Framework very nicely to demonstrate Leader Election and control of the same. I simply loved his code and images and have used parts of the same in the above demonstration. I am sure I have only touched the surface of  ZooKeeper and am hoping I can learn more about the same. If my understanding about any of the concepts mentioned above is flawed, I would love to hear about the same. Again, a very Happy 2012! Keep the faith, the world won't end on December 21'st.

Wednesday, December 28, 2011

WebLogic JMS Partitioned Distributed Topics and Shared Subscriptions

I had previously blogged about the challenges of scaling topics with durable subscribers. In JMS, with topics, one can typically have a single topic subscriber for a particular [Client ID, Subscription Name] tuple.

In the example shown below the Order Topic has two durable subscribers, the Order Processor and the Auditor. Both of these get the message sent to the topic but at any give time, there can be exactly one of each consuming the message. There cannot be completing durable subscriptions as in the case of queues. The same translates to a scalability and availability challenge:
Oracle WebLogic has introduced the concept of a Partitioned distributed topic and shared connection subscriptions that will allow for more than one durable to subscription to be made available for a particular subscriber. In the figure shown below, with a shared subscription, one can have consumers of an "application" compete for messages of a Topic and this feature promotes scalability and availability. In the figure shown below there are two Order Processors which are competing for messages and so are the two Auditors. At no given time will a message be delivered to both the consumers of the same "application":

WebLogic introduces the concept of a UNRESTRICTED Client ID policy. With this setting on a topic, more than one connection in a cluster can share the same Client ID. The standard option of RESTRICTED enforces that only a single connection with a particular Client ID can exist in a cluster.

Sharing Subscriptions:
There are two policies for Subscription sharing:

1. Exclusive - This is the default and all subscribers created using the connection factory cannot share subscriptions with any other subscribers.

2. Sharable - Subscribers created using this connection factory can share their subscriptions with other subscribers. Consumers can share a durable subscription if they have the same [Client ID, Client ID policy and subscription name].

To promote HA, set the Subscription on the Topic as Shareable.

When creating a Topic in Weblogic set the Forwarding Policy to be Partitioned. This causes a message sent to a partitioned distributed topic to be sent to a single physical member. In addition, the message will not be forwarded to other members of the cluster if there are no consumers in the current physical member.

If we want a HA solution, then the listeners will need to connect to each and every physical member across the cluster. Consider the following figure, on Consumer Machine 1, there are two consumers of Type A and two Consumers of Type B, the same applies to the Consumer Machine 2. It is important to note that consumers of a type or application connect to both the physical members of the cluster. Doing so ensures that in the event a consumer machine dies unexpectedly, the other consumer machine can still continue to function ensuring availability:


The above can be achieved using Spring Framework's Message Listener Container with some wrapper code as shown below where the PartitionedTopicContainer is a container of containers connecting to each physical member of the topic with the same client ID and subscription name:
public class PartitionedTopicContainer {
  private final List<DefaultMessageListenerContainer> containers;

  public PartitionedTopicContainer(String clientID, String subscriptionName, ConnectionFactory connectionFactory, Destination ...physicalTopicMembers) {
    this.containers = Lists.newArrayList();

    for (Destination physicalMember : physicalTopicMembers) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

      container.setConnectionFactory(connectionFactory);
      container.setDestination(physicalMember);
      container.setClientId(clientID);
      container.setDurableSubscriptionName(subscriptionName);
    }
  }

  public void setMessageListener(Object listener) {
    for (DefaultMessageListenerContainer container : containers) {
      container.setMessageListener(listener);
    }
  }

  public void start() {
    for (DefaultMessageListenerContainer container : containers) {
      container.start();
    }
  }

  public void shutdown() {
    for (DefaultMessageListenerContainer container : containers) {
      container.shutdown();
    }
  }
}
The above container could then be used as follows:
// Obtain each physical member of the partitioned distributed topic
Destination physicalMember1 = (Destination) context.lookup("Server1@orderTopic");
Destination physicalMember2 = (Destination) context.lookup("Server2@orderTopic");

// Container for the Order Processor
PartitionedTopicContainer subscriber1 = new PartitionedTopicContainer("orderConnectionId", "orderProcSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber1.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber order processor got a message...");
    ...
  }
});
// Container for the Auditor
PartitionedTopicContainer subscriber2 = new PartitionedTopicContainer("auditorConnectionId", "auditorSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber2.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber auditor got a message...");
    ...
  }
});

subscriber1.start();
subscriber2.start();


The parts of the code above where a consumer of the API has to look up each and every physical member and provide the same to  the container is a lot of boiler plate and does not account well for cases when a physical member becomes available/unavailable. Luckily, WebLogic provides the JmsDestinationAvailabilityHelper API which is a way to listen to events relating to physical  member availability and unavailability. The PartitionedTopicContainer shown above could easily be augmented with the availability helper API and get notified of physical destination availability and unavailability to correspondingly start and stop the internal container to the physical destination. Psuedo-code of how this can be achieved with the above container is shown below:

public class PartitionedTopicContainer implements DestinationAvailabilityListener {
  private final String partDistTopicJndi;

  private final ConnectionFactory connectionFactory;
   
  @GuardedBy("containerLock")
  private final Map<String, DefaultMessageListenerContainer> containerMap;

  private final Object containerLock = new Object();

  // WebLogic Handle
  private RegistrationHandle registrationHandle;

  private final CountdownLatch startLatch = new CountdownLatch(1);

  public PartitionedTopicContainer(String clientID, String subscriptionName, String clusterUrl,
    ConnectionFactory connectionFactory, String partDistTopicJndi) {
    this.clusterUrl = clusterUrl;
    this.clientID = clientID;
    this.subscriptionName = subscriptionName;
    this.partDistTopicJndi = partDistTopicJndi;
    this.containerMap = Maps.newHashMap();
    this.connectionFactory = connectionFactory;
  }

  public void start() throws InterruptedException {
    Hashtable<String, String> jndiProperties = ...;
    jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
    jndiProperties.put(Context.PROVIDER_URL, clusterUrl);
    
    JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
   
    // Register this container as a listener for destination events
    registrationHandle = dah.register(jndiProperties, partDistTopicJndi, this);
   
    // Wait for listener notification to start container
    startLatch.await();
  }

  @Override
  public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
    synchronized (containerLock) {
      // For all Physical destinations, start a container
      for (DestinationDetail detail : physicalAvailableMembers) {
        Destination physicalMember = lookupPhysicalTopic(detail.getJNDIName());
        DefaultMessageListener container = new DefaultMessageListenerContainer();
        
        container.setConnectionFactory(connectionFactory);
        container.setDestination(physicalMember);
        container.setClientId(clientID);
        container.setDurableSubscriptionName(subscriptionName);
        System.out.println("Starting Container for physical Destination:" + detail);
        container.start();
        containerMap.put(detail.getJNDIName(), container);
      }
    }
    startLatch.countdown();       
  }
  
  @Override
  public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
    synchronized (containerLock) {
      // Shutdown all containers whose physical members are no longer available
      for (DestinationDetail detail : physicalUnavailableMembers) {
        DefaultMessageListenerContainer container = containerMap.remove(detail.getJNDIName());
        container.shutdown();
      }
    }
  }

  @Override
  public void onFailure(String destJndiName, Exception exception) {
    // Looks like a cluster wide failure
    shutdown();
  }

  public void shutdown() {
    // Unregister for events about destination availability
    registrationHandler.unregister();

    // Shut down containers
    synchronized (containerLock) {
       for (Iterator<Map.Entry<String, DefaultMessageListenerContainer>> i = containerMap.entrySet().iterator(); i
          .hasNext();) {
        Map.Entry<String, DefaultMessageListenerContainer> entry = i.next();
        System.out.println("Shutting down container for:" + entry.getKey());
        entry.getValue().shutdown();
        i.remove();
      }
    }  
  }
}   

Some of the things to remember when creating a partitioned distributed topic and shared subscription:

1. Connection Factory being used should have "Subscription Sharing Policy" set as "Shareable"
2. Forwarding policy on the Partitioned Distributed Topic should be set as "Partitioned"
3. Message forwarding will not occur, so subscribers must ensure connections exist to every physical member else messages can pile up for the subscription on that topic
4. If a server hosting a physical member is unavailable then messages from that physical topic will be unavailable until server is made available.

Partitioned Distributed Topics and Shared Subscriptions looks promising. One thing I need to sort out is how does one handle error destinations on a per subscription level with WebLogic. Any passer by with thoughts, please do shoot it my way.

Tuesday, October 4, 2011

Jersey JAX-RS and JAXB Schema Validation

This BLOG is about Jersey web services and Schema based validation with the same. I have also been playing with Google Guice (I hear it, Spring Traitor, err not quite) :-) and figured I'd use Guice instead of Spring for once.

When un-marshalling XML using JAXB, schema based validation facilitates stricter validation. The Jersey recommended approach to enable Schema based validation is to create a javax.ws.rs.ext.ContextResolver. I have seen examples of using a javax.ws.rs.ext.MessageBodyReader as well. The code demonstrated is largely based and influenced by the discussion on XSD validation between Andrew Cole and Paul Sandoz. The goals of the resolver are:
  1. Enable schema based validation if desired
  2. Provide the ability to enable a custom validation event handler
  3. Enable formatted JAXB and the ability to set the character encoding
It is said that JAXB contexts are better of cached as they are expensive to create. I am only going by what I have read in different posts and/or discussions and do not have any metrics to claim the same. A generic JAXB Context resolver is shown here that accomplishes the above:
public class JaxbContextResolver implements ContextResolver&<JAXBContext> {
  static final ConcurrentMap<String, JAXBContext> CONTEXT_MAP = new MapMaker()
      .makeComputingMap(new Function<String, JAXBContext>() {

        @Override
        public JAXBContext apply(String fromPackage) {
          try {
            return JAXBContext.newInstance(fromPackage);
          }
          catch (JAXBException e) {
            throw new RuntimeException("Unable to create JAXBContext for Package:" + fromPackage, e);
          }
        }
      });
  
  private Schema schema;
  private ValidationEventHandler validationEventHandler;
  
  public JaxbContextResolver withSchema(Schema schema) {
    this.schema = schema;
    return this;
  }
  ...
  public JaxbContextResolver withValidationEventHandler(ValidationEventHandler validationEventHandler) {
    this.validationEventHandler = validationEventHandler;
    return this;
  }
  
  @Override
  public JAXBContext getContext(Class<?> type) {
    return new ValidatingJAXBContext(CONTEXT_MAP.get(type.getPackage().getName()),
      schema, formattedOutput, encoding, validationEventHandler);
  }
   ...
  public static class ValidatingJAXBContext extends JAXBContext {
    private final JAXBContext contextDelegate;
     ....
    private final ValidationEventHandler validationEventHandler;

    @Override
    public javax.xml.bind.Unmarshaller createUnmarshaller() throws JAXBException {
      javax.xml.bind.Unmarshaller unmarshaller = contextDelegate.createUnmarshaller();
      
      // Set the Validation Handler
      if (validationEventHandler != null) {
        unmarshaller.setEventHandler(validationEventHandler);
      }
      
      // Set the Schema
      if (validatingSchema != null) {
        unmarshaller.setSchema(validatingSchema);
      }

      return unmarshaller;
    }

    @Override
    public javax.xml.bind.Marshaller createMarshaller() throws JAXBException {
      javax.xml.bind.Marshaller m = contextDelegate.createMarshaller();
      m.setProperty("jaxb.formatted.output", formattedOutput);

      if (encoding != null) {
        m.setProperty("jaxb.encoding", encoding);
      }

      return m;
    }

    @Override
    public Validator createValidator() throws JAXBException {
      return contextDelegate.createValidator();
    }
   }
 }
The Context resolver itself is registered as a Singleton with Jersey in an Application class. For example:
public class NotesApplication extends Application {
  private Set<Object> singletons;
  
  public NotesApplication() {
   ....
    Schema schema = null;
   
    try {
      schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI).newSchema(
        getClass().getResource("/notes.xsd"));
    }
    catch (SAXException e) {
      throw new RuntimeException("Error obtaining Schema", e);
    }

    JaxbContextResolver resolver = new JaxbContextResolver().withSchema(schema)
      .formatOutput(true);
  
    Set<Object> single = Sets.newHashSet();
    single.add(resolver);
    singletons = Collections.unmodifiableSet(single);
  }
  
  public Set&lt;Object> getSingletons() {
    return singletons;
  }
}
With the above, Unmarshaller's are provided that utilize the provided schema when unmarshalling received payload. When an error occurs during Unmarshalling, Jersey catches the javax.xml.bind.UnmarshallException, wraps it in a javax.ws.rs.WebApplicationException and throws the same. If one desires to customize the response sent back to the consumer the only option is to create an javax.ws.rs.ext.ExceptionMapper for javax.ws.rs.WebApplicationException and interrogate the cause to determine if it were a javax.xml.bind.UnmarshallException. I could not find a way to map the Unmarshall exception thrown by JAXB directly to an ExceptionMapper. If anyone has done so, I would love to hear about their solution.
class WebApplicationExceptionMapper imlements ExceptionMapper<WebApplicationException> {
   public Response toResponse(WebApplicationException e) {
      if (e.getCause() instanceof UnmarshallException) {
         return Response.status(404).entity("Tsk Tsk, XML is horrid and you provide the worst possible one?").build();
      }
      else {
         return Response.....
      }
  }
}
Sometimes, one does not need the full fledged validation benefits of a schema and can make do with a ValidationEventHandler. In such a case, one can provide the JaxbContextResolver with an instance of a javax.xml.bind.ValidationEventHandler. The handler could then be configured to throw a custom exception which can be mapped to a custom response using an ExceptionMapper as shown below. This approach is what appears on the Jersey mailing list entry:
JaxbContextResolver resolver = new JaxbContextResolver().withValidationEventHandler(new ValidationEventHandler() {
     public boolean handleEvent(ValidationEvent event) {
       if (event.getSeverity() == ValidationEvent.WARNING) {
          // Log and return
          return true;
      }
      throw new CustomRuntimeException(event);
   });

@Provider
class CustomRuntimeExceptionMapper implements ExceptionMapper<CustomRuntimeException> {
   public Response toResponse(CustomRuntimeException e) {
      return Response.status(400).entity("Oooops:" + e).build();
   }
}
Note that throwing a custom Exception and catching the same via an ExceptionMapper will work only if one does NOT provide a Schema. Once a schema is in place, the exception will be caught by JAXB and swallowed and one has to catch WebApplicationException and provide a custom response as described earlier.

An example provided herewith demonstrates a simple Notes service that manages the life cycle of a Note. It employs Guice to inject dependencies into Resource and Sub-Resource classes. The application also demonstrates the JaxbContextResolver and the registration of a schema for validating a received Note payload. Quite sweet actually. The details of integrating Guice and Jersey is not being detailed in this BLOG as there is already a pretty thorough BLOG by Iqbal Yusuf that describes the same.

Download the maven example by clicking HERE, extract the archive and simply execute "mvn install" at the root level to see it a client-server interaction. If any passer by is a JAXB Guru or has any tips on the Resolver, I would love to hear. Enjoy!

Friday, September 9, 2011

Documenting Web Services with Enunciate


Understanding the workings of a Web Service from the perspective of a consumer, be it REST or SOAP based is not the easiest. Wading through a WADL or a WSDL is not a fun task, at least not for me.  Been looking into an open source project that has been around for some time called Enunciate and am pretty impressed by its capabilities.

Enunciate provides automated documentation of your resources and end points. It also has the ability to generate client artifacts for consuming the JAX-RS services in many different programming languages.

The process is itself quite simple. After creating your web service you have enunciate run as part of your build process and it generates nice HTML based documentation of your service along with client code (Yeah!) in different languages to consume the service. Using annotation based web services facilitates this nice magic I guess.

To experience enunciate for myself, I created a simple web service that exposes SOAP and REST based calls of the same underlying API.

A few simple criteria that I had for my sample application:
  • Documentation should be available at the Root context
  • Resources and Services must also be available at the Root context
To my pom, I added the necessary enunciate dependencies as shown below:
                                       
<dependencies>
  .... 
   <dependency>
        <groupId>org.codehaus.enunciate</groupId>
        <artifactId>enunciate-rt</artifactId>
	<version>1.24</version>
   </dependency>
 </dependencies>

<build>
  <plugins>
    ...
    <plugin>
          <groupId>org.codehaus.enunciate</groupId>
        <artifactId>maven-enunciate-plugin</artifactId>
        <version>1.24</version>
        <configuration>
          <configFile>enunciate.xml</configFile>
        </configuration>
        <executions>
           <execution>
              <goals>
                 <goal>assemble</goal>
              </goals>
          </execution>
        </execution>
    </plugin>
 </plugins>
To customize where my service's resources will be available, i.e., at the ROOT context I created an enunciate.xml file:
<enunciate xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:noNamespaceSchemaLocation="http://enunciate.codehaus.org/schemas/enunciate-1.21.xsd">
  
	<services>
		<<est defaultRestSubcontext="/"/>
		<soap defaultSoapSubcontext="/"/>
	</services>

	<modules>
		<spring-app disabled="true" />
		<docs docsDir="/" splashPackage="com.welflex.rest" title="Greetings Web Service API"
			copyright="Welflex"/>
	</modules>
</enunciate>
After building and running the web service, a nicely generated introduction page that looks like:


Navigating the application, one can view the Greeting Resource and Greeting Web Service end point documentation:




















Generated libraries for consuming the service are available for download as well along with documentation on how to use them, pretty sweet huh?


A maven example demonstrating the above is available for DOWNLOAD HERE

Once you download the example, execute:
>mvn install
>mvn jetty:run-exploded

Access the service at http://localhost:8080

Note that executing mvn jetty:run would give me a 404's. My only option was the  run-exploded. Enuciate creates an altered web.xml with its own custom classes. I am also curious to know whether using enunciate impacts the performance of the web service, and if so, by how much. Enunciate also facilitates skinning the documentation, I have not done the same but from their documentation it looks pretty straight forward.

I found that if one has a custom Servlet Filter and/or Listener defined in the project's web.xml then to get the same included into the final web.xml generated by enunciate, one would need to set the following in enunciate.xml.
<webapp mergeWebXML="src/main/webapp/WEB-INF/web.xml" />
Upon merging the same, my application started throwing 404's when attempting to access the documentation. In other words, using a custom servlet filter or listener with my use case of resources mounted at root context does not seem to work with my application and needs further investigation.Without being able to set Custom Filters and Listeners is a major blocker.

Another tool that I have included in the pom is JAX-Doclets which generate nice javadoc for the JAX-RS portion of the service. To view the same execute mvn site and navigate to the generated documentation.

Friday, February 25, 2011

Hadoop Pig and Java Map Reduce - A maven example

Recently I have been involved with the Hadoop family and as always would like to share :-) I am hoping to provide an individual interested in evaluating Map-Reduce and Apache Pig a starter project for the same.

At the core of Hadoop lies HDFS and the ability to perform Map-Reduce operations on data. Leaning on my previous example of Cassandra Map Reduce, this BLOG will help demonstrate how Map-Reduce using Hadoop can be achieved using simple plain ole Java or its exotic cousin Apache Pig.

Before getting started with the example, you will need to get Hadoop running in a pseudo distributed mode at the very least.  As a user of Ubuntu, I found the BLOG by Rahul Patodi to be a great start to installing Cloudera's Hadoop version. Alternatively, you can do the same by following the instructions on the Cloudera WIKI.

The example used in this BLOG uses a file that contains "comments" that are written to a file in HDFS in JSON format and subsequently demonstrates how Map Reduce jobs can be executed either in Java or in Pig. The jobs themselves check for certain "Key words" of interest within the comments posted, think Web Bot here :-). An example comments file could look like:
{"commenterId":"donaldduck","comment":"The world is a cave. James bond lives in a Cave.","country":"TANZANIA"}
{"commenterId":"factorypilot","comment":"Only a cave man could do this","country":"JAPAN"}
{"commenterId":"nemesis","comment":"Felix Lighter and James Bond work well together as they are cave men","country":"BRAZIL"}
{"commenterId":"jamesbond","comment":"James Bond would be dead without Q to help him.","country":"GERMANY"}

Java Map-Reduce
For the Java version, one would write a Mapper that would extract the comment key, check the same for occurences of the word of interest and increment the same while a Reducer in turn totals the results as show below:

public class CommentWordMapReduce {
  /**
   * Mapper
   */
  public static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);

    private Text word = new Text();

    private static final String COMMENT_KEY = "comment";
    private static final Set<<String> WORDS_TO_GET_COUNT_OF = new HashSet<String>(Arrays
        .asList(new String[] { "james", "2012", "cave", "walther", "bond" }));

    private final JSONParser parser = new JSONParser();
    
    
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException,
      InterruptedException {
      JSONObject jsonObj;
      try {
        jsonObj = (JSONObject) parser.parse(value.toString());
      }
      catch (ParseException e) {
        // Hmm unable to Parse the JSON, off to next record, better log though :-)
        e.printStackTrace();
        return;
      }
      String comment = jsonObj.get(COMMENT_KEY).toString();
      StringTokenizer tokenizer = new StringTokenizer(comment);

      while (tokenizer.hasMoreTokens()) {
        String token = tokenizer.nextToken().toLowerCase();
        if (WORDS_TO_GET_COUNT_OF.contains(token)) {
          word.set(token);
          context.write(word, ONE);
        }
      }
    }
  }

  /**
   * Reducer
   */
  public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
      InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }

  /**
   * @param inputPath The input file location from HDFS
   * @param outputPath Where to store results of the Map-Reduce
   */
  public boolean mapred(String inputPath, String outputPath) throws IOException,
    InterruptedException,
    ClassNotFoundException {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "process word count");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(WordMap.class);
    job.setReducerClass(WordReducer.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    
    job.setNumReduceTasks(1);
    
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
    job.waitForCompletion(true);

    return job.isSuccessful();
  }
}
A unit test provided will simply ensure that the expected count for each interested word infact match up with the actual as shown below:
public void javaMapReduce() throws Exception {
   assertTrue(new CommentWordMapReduce().mapred(INPUT_HDFS_FILE, OUTPUT_HDFS_DIR));
   validateResult();
}
If you wish to see the contents of the map-reduce job then execute:
>hadoop fs -cat /users/junittest/wcresults/part-r-00000
2012 1828
bond 4490
cave 2769
james 3631
walther 921
Pig Map-Reduce:

For the Pig Map Reduce equivalent of the example, there are two helper classes that I define, a custom LoadFunc (Loader Function) for the JSON file and a FilterFunc (filter) to only include the words of interest. The custom JSON loader is the courtesy of Kim Vogt from Git Hub and the Like Filter is a non-comprehensive version that I defined as follows:

public class LikeFilter extends FilterFunc {

  public Boolean exec(Tuple input) throws IOException {
    if (input == null || input.size() < 2) {
      // If no filter and input element are provided, filter provides false.
      return Boolean.FALSE;
    }
   
    List<Object> elems = input.getAll();
    
    // First element is the word presented, "for example foo or bar or bond"
    Object expected = input.getAll().get(0);
    
    // Subsequent elements are the filter conditions
    List<Object> comparators = elems.subList(1, elems.size());
    
    return comparators.contains(expected);
  }
}
Using the two classes, the Pig Script for the same map-reduce task looks like:
comments = LOAD '/users/junittest/comments' USING com.welflex.hadoop.pig.load.PigJsonLoader() AS (commentMap:map[]);
words = FOREACH comments GENERATE FLATTEN(TOKENIZE(LOWER(commentMap#'comment')));
filter_words = FILTER words BY com.welflex.hadoop.pig.func.LikeFilter($0, '2012', 'bond', 'cave', 'james', 'walther');
grouped = GROUP filter_words BY $0;
counts = FOREACH grouped GENERATE group,COUNT(filter_words);
store counts INTO '/users/junittest/wcresults';
The unit test for Pig simply registers related jars and executes the script as follows:
public void pigMapReduce() {
    // Get the jars required for the map reduce including custom functions
    Set<String> jars = getJarsForPig();

    // Set ExecType.MAPREDUCE if you want to run in a distributed mode
    PigServer pigServer = new PigServer(ExecType.MAPREDUCE);

    for (String jar : jars) {
      // Register the jars for Pig      
      pigServer.registerJar(jar);
    }
     //Execute the pig script       
     pigServer.registerScript(WordCountMapReduceTest.class
            .getResource("/wordcount.pig").toURI().toURL().getFile());
    // Post validation to make sure the results of the map-red are correct.
    validateResult();
}
The goal of above demonstration is to be able to write the pig script in a single location in your maven project and be able to run a unit-test of the same without having to re-write the script or handle registering custom jars. From the above example, one can witness that the Pig Script is far lesser complicated and lesser verbose when compared to the java version of the same and from an execution perspective quite performant as well. The example is organized as follows:
hadoop-example
|-- hadoop <-- Demonstrate the map - reduce of both Java and Pig versions
|   |-- pom.xml
|   `-- src
|       |-- main
|       |   |-- java
|       |   |   `-- com
|       |   |       `-- welflex
|       |   |           `-- hadoop
|       |   |               `-- hdfs
|       |   |                   |-- HdfsServiceImpl.java
|       |   |                   `-- HdfsService.java
|       |   |-- pig
|       |   |   `-- wordcount.pig
|       |   `-- resources
|       |       |-- core-site.xml
|       |       `-- log4j.properties
|       `-- test
|           |-- java
|           |   `-- com
|           |       `-- welflex
|           |           `-- hadoop
|           |               `-- mapred
|           |                   |-- CommentWordMapReduce.java
|           |                   `-- WordCountMapReduceTest.java
|           `-- resources
|               `-- comments
|-- pig-funcs <----- Contains the custom Pig artifacts
|   |-- pom.xml
|   `-- src
|       `-- main
|           `-- java
|               `-- com
|                   `-- welflex
|                       `-- hadoop
|                           `-- pig
|                               |-- func
|                               |   `-- LikeFilter.java
|                               `-- load
|                                   |-- JsonLineParser.java
|                                   `-- PigJsonLoader.java
`-- pom.xml
Simply execute:
>mvn test
to witness both the Java and Pig Map Reduce versions in actions or import into your favorite IDE and do the same. You can easily change the example to search for country count or augment the size of the file and try a non-local mode of map-reduce. I must state that when working with Hadoop and family, one needs to be careful with the versions they are working with. The above mentioned example works with Cloudera's Hadoop Version Hadoop 0.20.2-CDH3B4

Download the example here and happy Pigging. Oink out, or should I say Grunt out ;-)

Tuesday, January 4, 2011

Apache Cassandra Map Reduce - An Example

Happy New Year :-)! Starting this year with a small BLOG on using Map Reduce with Cassandra. Map Reduce on Cassandra is supported via Hadoop since version 0.6. Hadoop Map Reduce jobs can retrieve data from Cassandra and reduce the same. There is a word count example that is available via the Cassandra distribution. In this BLOG I will be using the Word Count example agains't the super column I had defined in my previous BLOG on Cassandra. With 0.7 of Cassandra there is support to reduce the output to Cassandra itself.

For the scope of the example, I have used the Comments Column Family from my previous BLOG and my goal is to find counts of certain words that I am interested in a time slice range. The example provided creates multiple comments on a single blog entry and then runs a Hadoop map reduce job that will output the results of words interested in into a column family that contains only the count of each word.
The Map Reduce job is provided a Slice Predicate providing a time range of data to search on.
ByteBuffer startKey = ...;
ByteBuffer endKey = ....;

SliceRange range = new SliceRange();
range.setStart(startKey);
range.setFinish(endKey);
range.setCount(Integer.MAX_VALUE);
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);

In the same vein, one could start different jobs across different time ranges to run simultaneously.

The Mapper is provided with the words we are interested and only increments the counts on the word during the map process.
    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException,
      InterruptedException {
      
      for (Map.Entry<ByteBuffer, IColumn> entry : columns.entrySet()) {
        IColumn column = entry.getValue();
        if (column == null) {
          continue;
        }
        
        IColumn textCol = column.getSubColumn(COMMENT_COL_NAME);
        String value = ByteBufferUtil.string(textCol.value());
     
        StringTokenizer itr = new StringTokenizer(value);
        while (itr.hasMoreTokens()) {
          String nextWord = itr.nextToken().toLowerCase();
          // Only trap expected words
          if (expectedWords.contains(nextWord)) {
            word.set(nextWord);
            context.write(word, one);
          }
        }
      }
    }
  }

The Reducer in turn reduces the same into a Cassandra family called Word Count similar to the Word count example provided by Cassandra.

If you run the MapReduceTest, as an output you can observe the following counts of the words I am interested in:
The word [james] has occured [1810] times
The word [2012] has occured [902] times
The word [cave] has occured [1368] times
The word [walther] has occured [481] times
The word [bond] has occured [2265] times
Note that I have made every word lower case in the example.
To run the example, download the same from HERE and run "mvn test". If I have not understood something correctly, please do let me know as although Map-Reduce is Old, my experience is minimal :-)

Monday, November 8, 2010

Apache Cassandra with Hector - An Example

Recently I had been to the Strange Loop Conference in Saint Louis. While there I indulged in two things primarily, booze with old buddies and No SQL in the conference.

In particular, I found a lot of mention of Apache Cassandra. Why would one care about Cassandra, how about a 150 TB cluster spanning over 150 machines at Facebook ? Cassandra is used by organizations such as Digg, Twitter etc who deal with a large amount of data. I could attempt to write more on Cassandra but there is a great presentation by Eric Evans on the same http://www.parleys.com/#id=1866&sl=40&st=5.

If not talking about Cassandra, what am I talking about? Well, I wanted to use Cassandra to get a grasp of how Columns and Super Columns work in Cassandra. Yeah, I hear it, WTF are Super Columns? I found myself asking the same question at the Conference, but luckily for me I found this nice blog by Arin Sarkissian titled aptly "WTF is a SuperColumn?" explaining the same. I wanted to translate his example Schema of a Blog Application into a working example that uses Cassandra and provide a playing ground for someone like me wanting to try cassandra.

So I am only going to use Java, sorry no Ruby or Scala for me right now. There is a Thrift Java Client for Cassandra but is limited in functionality so I proceeded to use Hector.

The model created was based Arin's schema with a few enhancements. I have updated the Author Schema to contain a user name and password with the user name being the "row key" for the Column Family Authors.
<!--
    ColumnFamily: Authors
    We'll store all the author data here.

    Row Key = Author user name
    Column Name: an attribute for the entry (title, body, etc)
    Column Value: value of the associated attribute

    Access: get author by userName (aka grab all columns from a specific Row)

    Authors : { // CF
        sacharya : { // row key
            // and the columns as "profile" attributes
            password:#$%#%#$%#%
            name:Sanjay Acharya
            twitterId: sterling23,
            email: sacharya@example.com,
            biography: "bla bla bla"
        },
        // and the other authors
        dduck {
            ...
        }
    }
-->
<ColumnFamily CompareWith="BytesType" Name="Authors"/>
The above Column Family translated to a simple Author POJO as shown below:
public class Author {
  private String userName;
  private String password;
  private String name;
  private String twitterId;
  private String biography;
  ..// Getters and Setters
}
Using Hector directly, a DAO to create an author might look like:
public void create(Author author) {
    Mutator<String> mutator = HFactory.createMutator(keySpace, StringSerializer.get());
    
    String userName = author.getUserName();
    
    mutator.addInsertion(userName,COLUMN_FAMILY_NAME,
        HFactory.createColumn("password", author.getPassword(), StringSerializer.get(),
          StringSerializer.get()))
          .addInsertion(userName, COLUMN_FAMILY_NAME, 
            HFactory.createColumn("name", author.getName(), StringSerializer.get(), 
              StringSerializer.get()))
          .addInsertion(userName, COLUMN_FAMILY_NAME, 
            HFactory.createColumn("biography", author.getBiography(), StringSerializer.get(),
              StringSerializer.get()))
          .addInsertion(userName, COLUMN_FAMILY_NAME, 
            HFactory.createColumn("twitterId", author.getTwitterId(), StringSerializer.get(),
              StringSerializer.get()))
}
The above code felt rather verbose so with a small compromise, column names are the same name as attribute names of the POJO and default constructor must exist for the POJO, I present an AbstractColumnFamilyDao that an AuthorDao for example would implement:
public abstract class AbstractColumnFamilyDao<KeyType, T> {
  private final Class<T> persistentClass;
  private final Class<KeyType> keyTypeClass;
  protected final Keyspace keySpace;
  private final String columnFamilyName;
  private final String[] allColumnNames;

  public AbstractColumnFamilyDao(Keyspace keyspace, Class<KeyType> keyTypeClass, Class<T> persistentClass,
      String columnFamilyName) {
    this.keySpace = keyspace;
    this.keyTypeClass = keyTypeClass;
    this.persistentClass = persistentClass;
    this.columnFamilyName = columnFamilyName;
    this.allColumnNames = DaoHelper.getAllColumnNames(persistentClass);
  }

  public void save(KeyType key, T model) {
  
    Mutator<Object> mutator = HFactory.createMutator(keySpace, SerializerTypeInferer.getSerializer(keyTypeClass));
    for (HColumn<?, ?> column : DaoHelper.getColumns(model)) {
      mutator.addInsertion(key, columnFamilyName, column);
    }

    mutator.execute();
  }

  public T find(KeyType key) {
    SliceQuery<Object, String, byte[]> query = HFactory.createSliceQuery(keySpace,
      SerializerTypeInferer.getSerializer(keyTypeClass), StringSerializer.get(), BytesSerializer.get());

    QueryResult<ColumnSlice<String, byte[]>> result = query.setColumnFamily(columnFamilyName)
        .setKey(key).setColumnNames(allColumnNames).execute();

    if (result.get().getColumns().size() == 0) {
      return null;
    }

    try {
      T t = persistentClass.newInstance();
      DaoHelper.populateEntity(t, result);
      return t;
    }
    catch (Exception e) {
      throw new RuntimeException("Error creating persistent class", e);
    }
  }

  public void delete(KeyType key) {
    Mutator<Object> mutator = HFactory.createMutator(keySpace, SerializerTypeInferer.getSerializer(keyTypeClass));
    mutator.delete(key, columnFamilyName, null, SerializerTypeInferer.getSerializer(keyTypeClass));
  }
}
One might ask, why not just annotate the POJO with JPA annotations and thus handle the persistence? I did head down that route but found a project that was already doing the same, i.e., Kundera. For this reason, I kept the
example more focussed on Hector. Also I am a bit wary regarding whether the JPA specs will be a good fit for a Sparse column store like Cassandra.

With the above mentioned DAO, I modeled the rest of my code to Arin's example schema. The sample code provided contains a Blog Simulation which is a Multi-threaded test that simulates the working of the BLOG application, i.e., authors being created, BLOG Entries being created and authors commenting on BLOG Entries, Finding all Blog Entries created, Getting Blog Entries by a tag, Getting comments for a Blog Entry etc etc.

The example can be DOWNLOADED HERE. You will not need to install a Cassandra server as the example uses an embedded Server. The code however does not demonstrate any fail over or consistency strategies. Enjoy!

Friday, October 1, 2010

MongoDB with Morphia - An example

MongoDB is an open source highly scalable, performant document oriented database and I wanted to play with the same. The database itself is feature rich offering features such as sharding, in place updates and map reduce.

The database is written in C++ and uses JSON style documents for mapping objects. Mongo's java API provides the concept of an object that takes name-value pair's depicting the data and then stores the same. There is a lack of type safety with this approach and also the effort of converting regular java pojo's into the downstream mongo object.

The type safety issue is addressed by the Morphia project that allows for easy mapping of objects from-to MongoDB while also providing a querying interface. The API itself makes use of annotations thus not requiring the use of any configuration files. Think of this like Hibernate/JPA with annotations for Mongo.

The API itself provides for access to Mongo directly if required. In this BLOG, I am trying out a simple example of using Morphia. I developed the project in the following steps:

1. Connection to mongo
2. POJO or Model
3. DAO

I have used a simple data model of an Order and its ancillary objects.

1. Connection to Mongo:
The Mongo object itself is a connection pool so one does not need to create an additional one. Take a look at the documentation on the same.

I define a simple Connection manager that is a singleton that handles the initialization of a Morphia DataStore instance as shown below:
public final class MongoConnectionManager {
  private static final MongoConnectionManager INSTANCE = new MongoConnectionManager();

  private final Datastore db;
  public static final String DB_NAME = "mydb";
  
  private MongoConnectionManager() {
    try {
      Mongo m = new Mongo("localhost", 27017);
      db = new Morphia().map(Order.class).map(LineItem.class).map(Customer.class).createDatastore(
        m, DB_NAME);
      db.ensureIndexes();
    }
    catch (Exception e) {
      throw new RuntimeException("Error initializing mongo db", e);
    }
  }

  public static MongoConnectionManager instance() {
    return INSTANCE;
  }
  ...
}

Also note that in the above code, there is a call to db.ensureIndexes(). This method will synchronously create indices if not present and if present continues on seamlessly.

2. Model:
I then defined my Order Model as shown below:
@Entity(value="orders", noClassNameStored = true)
public class Order {
  @Id
  private ObjectId id;
  @Reference
  private Customer customer;
  @Embedded
  private List<LineItem> lines;
    
  private Date creationDate;
  private Date lastUpdateDate;
  ...
  // Getters and setters
  ..
  
  @PrePersist
  public void prePersist() {
    this.creationDate = (creationDate == null) ? new Date() : creationDate;
    this.lastUpdateDate = (lastUpdateDate == null) ? creationDate : new Date();
  }
}
The morphia annotations are not JPA but very similar. As shown above we are mapping the Order class to the mongo collection of "orders", we define an Id annotation indicating the order identifier. As the type of the Id has been defined as ObjectId, Mongo will generate the Id automatically. If one uses any other type, the Id must be explicitly set. Also note the explicit setting of noClassNameStored as the class name will be otherwise be stored by default. The storing of the class name becomes useful when working with multiple-inheritance structures where the correct class would need to be instantiated.

Note that the Customer object has been defined with an @Reference annotation indicating that the Customer object can exist independent of the Order object and an existing one must be provided before an order can be persisted.

The @Embedded tag on the Line item indicates that the line items lie in the scope of an order and will not exist independently without an order.

The create and update dates have not been annotated but will be included in the MongoDB collection automatically. One could alternatively add the @Property tag to the same and provide a specific name under which the property would reside.

If there is a field that one would not want persisted, then marking the same with @Transient will prevent the same.

Also note the segment where the prePersist methog tagged with the @PrePersist annotation is used to set the dates on the order prior to it being saved. Equivalent annotations exist for @PostPersist, @PreLoad and @PostLoad. The framework also supports the concept of EntityListeners for life cycle phases. So one can create an external class that responds to different
life cycle events as so:
@EntityListeners(OrderListener.class)
public class Order {
}

public class OrderListener {
  @PrePersist
  public void preCreate(Order order) {
    ...
  }
}

Viewing the order using the MongoDB console would display an order as such:
db.orders.find().forEach(printjson);
{
 "_id" : ObjectId("4ca4e5dbb95a4d64192cf119"),
 "customer" : {
  "$ref" : "customers",
  "$id" : ObjectId("4ca4e5dbb95a4d64172cf119")
 },
 "lines" : [
  {
   "lineNumber" : 1,
   "quantity" : 10,
   "product" : {
    "$ref" : "products",
    "$id" : "xbox"
   }
  }
 ],
 "creationDate" : "Thu Sep 30 2010 19:32:43",
 "lastUpdateDate" : "Thu Sep 30 2010 19:32:43"
}

3. DAO:
The Morphia framework provides a DAO class. Nice. The DAO class provides type safe DAO behavior. So if one has a customer object as an example:

DAO<Customer, String> customerDao = new DAO<Customer, String>(Customer.class, dataSource);

// save
customerDao.save(new Customer());
// Read
Customer sanjay = customerDao.createQuery().field("lastName").equal("acharya").get();

The DAO class however provides many operations that might not be desired such as dropCollection(). Decorating the same or extending the same is an easy enough procedure to limit the access of some of these methods while providing additional ones.

public class OrderDaoImpl extends DAO<Order, ObjectId> implements OrderDao {
  public OrderDaoImpl() {
    super(Order.class, MongoConnectionManager.instance().getDb());
  }
  ....

  @Override
  public List<Order> findOrdersByCustomer(Customer customer) {
   return createQuery().field("customer").equal(customer).asList();
  }

  @Override
  public List<Order> findOrdersWithProduct(Product product) {
    return createQuery().field("lines.product").equal(product).asList();
  }
}

From the above code, you see that the query API from Morphia provides an abstraction over the raw json query of mongo. Type safety is present but as in all cases for no fault of theirs, field naming will not be type safe. The "." notation is used to access nested fields,

Another example of a query involving multiple fields is shown below:
public List<Product> getProductsWithCategory(CategoryType type, double maxPrice) {
    return createQuery().field("price").lessThanOrEq(maxPrice).field("categoryType").equal(type).asList();
  }

Indexes can be applied to fields via the @Indexed annotation. For example,in the code shown below, the lastName property of a Customer is to be indexed in decending order.
@Indexed(value=IndexDirection.DESC, name="lastNameIndex", dropDups=false) 
  private String lastName;

I found morphia pretty easy to use with mongo. It will be nice if they support the JPA annotations. There are still things I want to try such as sharding and maybe Lucene integration for full text based search.

The example provided demonstrates Morphia in action with tests that use the model objects and DAOs. To get the tests running, download MongoDB , install the same and have it running on the default port. Run "maven test" in the project provided to see the tests or import the project for a code review :-)

The example can be downloaded HERE....

Some links:
1. A presentation on Mongo and Java
2. 10 things about No-sql databases

Tuesday, September 14, 2010

XML Injection

Been quite sometime since I posted something. Been looking into XML vulnerabilities and figured I'd share. The contents of this BLOG are in no means referencing any employer that I have been involved with and are soley my interests in XML injection.

When developing Web services, one would typically like to keep them secure by preventing agaisnt either Denial of Service or Security Attacks.

So what is XML Injection? If a malicious user alters the contents of an XML document by injecting XML tags, then when an XML parser tries to parse the document, security exploits can be achieved. For the scope of this BLOG, I am not creating a Web Service but explaining in plain vanilla XML and SAX as to how the exploits can occur. The same concepts of course apply to Web Services dealing with XML.

Tag Injection:
Consider the following XML Document that represents an item that is submitted
for purchase.
<item> 
    <description>Widget</description>
    <price>500.0</price>
    <quantity>1</quantity>
</item>
The above XML is represented as a JAXB Object to which it would be un-marshalled as shown below:
@XmlRootElement
public class Item {
  private String description;
  private Double price;
  private int quantity;
 
 // Setters and getters
  ....
}
When the above XML is parsed by a SAX Parser, the resulting Item object is correctly matched up with the corresponding attributes.

Consider the XML fragment altered by a malicious user who was aware of the structure:
<item> 
    <description>Widget</description>
    <price>500.0</price>
    <quantity>1</quantity>
    <!-- Additional Rows below for price and quantity -->
    <price>1.0</price>
    <quantity>1</price>
</item>
When the above document is parsed by the SAX Parser, it interprets the second element as overriding the first and thus the price reflects as 1.00 instead of 500.00. One only needs to think of the ramifications of this successful injection. Always a fan of the dollar store :-).

So how can one prevent the same from happening? Validating the received XML agaisnt an XSD will catch the fallacy in the structure. The following represents a simple XSD for the document:
<xs:schema
 xmlns:xs="http://www.w3.org/2001/XMLSchema"
 xmlns="http://www.welflex.com/item"
 elementFormDefault="qualified">
 <xs:element name="item">
  <xs:complexType>
   <xs:sequence>
    <xs:element name="description" type="xs:string"></xs:element>
    <xs:element name="price" type="xs:decimal"></xs:element>
    <xs:element name="quantity" type="xs:integer"></xs:element>
   </xs:sequence>
  </xs:complexType>
 </xs:element>
</xs:schema>
Now when the SAX Parser validates agaisnt the schema provided with the malicious XML, an exception would be risen to the effect of:
javax.xml.bind.UnmarshalException
 - with linked exception:
[org.xml.sax.SAXParseException: cvc-complex-type.2.4.d: Invalid content was found starting with element 'price'. No child element is expected at this point.]
 at javax.xml.bind.helpers.AbstractUnmarshallerImpl.createUnmarshalException(AbstractUnmarshallerImpl.java:315)
 at com.sun.xml.internal.bind.v2.runtime.unmarshaller.UnmarshallerImpl.createUnmarshalException(UnmarshallerImpl.java:503)
....

As to why an attacker would not simply submit a whole new document with malicious data rather than injecting the same, well I do not have a concrete answer for that and can only suppose it might have to do with source system validation by the target system?

XXE or Xml EXternal Entity Attack:
External entity references in XML allow data from outside the main document to be embedded into the XML document. This "feature" allows for a malicious user to either gain access to sensitive information and/or create a denial of service attack.

Consider the following malevolent XML fragment:
<Person>
  <FirstName>Sanjay</FirstName>
  <LastName>Acharya</LastName>
</Person>

Now, consider the same XML shown above with small modifications made by our friendly neighborhood attacker:
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/passwd">]>
<Person>
  <FirstName>Sanjay</FirstName>
  <LastName>Acharya&xxe;</LastName>
</Person>

When an XML parser such as a SAX Parser reads the XML in, if running for example on a *NIX system will result in the loading of the contents of the /etc/passwd file into the contents of the resulting parsed document. If the same is returned to the person invoking the attack, well you can imagine their glee at accessing this sensitive data.

The above XML read into a Person object would look like:
Person:Person [firstName=Donald, lastName=Duckroot:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/bin/sh
bin:x:2:2:bin:/bin:/bin/sh
sys:x:3:3:sys:/dev:/bin/sh
.........
couchdb:x:106:113:CouchDB Administrator,,,:/var/lib/couchdb:/bin/bash
haldaemon:x:107:114:Hardware abstraction layer,,,:/var/run/hald:/bin/false
speech-dispatcher:x:108:29:Speech Dispatcher,,,:/var/run/speech-dispatcher:/bin/sh
kernoops:x:109:65534:Kernel Oops Tracking Daemon,,,:/:/bin/false
saned:x:110:116::/home/saned:/bin/false
pulse:x:111:117:PulseAudio daemon,,,:/var/run/pulse:/bin/false
gdm:x:112:119:Gnome Display Manager:/var/lib/gdm:/bin/false
johndoe:x:1000:1000: John Doe,,,:/home/johndoe:/bin/bash

If the server program parsing the XML was running as root, then the attacker could also access the /etc/shadow file. Using External entity injection, the possibilities of retrieving sensitive information or creating a re-cursive failure and thus denial of service is definitely enticing for an attacker.

Clearly the way to restrict this from happening is either to scan requests at the network level or follow a direction to strictly enforce which entities can be resolved. A strategy to combat the same is explained at Secure Coding.

Another option to consider is to provide a custom SAXParserFactory that will employ the EntityResolver mentioned in the SecureCoding site but is made available either for the entire VM or a particular module. One can employ a custom SAXParserFactory class by registering in a jaxp.properties file in the jre/lib directory or via META-INF/services of an individual module.

An example of a Filtering SAX Parser Factory that could be employed using one of the above mentioned strategies is shown below. The factory delegates to the default factory to create a parser but then adds an EntityResolver to the parser before providing it back to the caller.
public class FilteringSaxParserFactory extends SAXParserFactory {
  // Delegate to this parser
  private SAXParserFactory delegate;
  
  // Delegate class
  private static final String DELEGATE_CLASS = "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl";
  
  // Allowed Entity Paths
  private Set<String> allowedEntityPaths;
  
  public FilteringSaxParserFactory() {
    delegate = SAXParserFactory.newInstance(DELEGATE_CLASS, Thread.currentThread().getContextClassLoader());
    delegate.setNamespaceAware(true);
    delegate.setValidating(true);
    allowedEntityPaths = new HashSet<String>();
    allowedEntityPaths.add("/usr/local/entity/somefile");
  }
  
  @Override
  public boolean getFeature(String name) throws ParserConfigurationException,
    SAXNotRecognizedException,
    SAXNotSupportedException {
    return delegate.getFeature(name);
  }

  @Override
  public SAXParser newSAXParser() throws ParserConfigurationException, SAXException {
    SAXParser parser = delegate.newSAXParser();
    XMLReader xmlReader = parser.getXMLReader();
    xmlReader.setEntityResolver(new EntityResolver() {
      
      @Override
      public InputSource resolveEntity(String publicId, String systemId) throws SAXException,
        IOException {
        if (allowedEntityPaths.contains(systemId)) {
          return new InputSource(systemId);
        }
        
        // Return blank path to prevent untrusted entities
        return new InputSource();
      }
    });
    
    return parser;
  }
  ....
}

XML Bomb Attack:
Another form of an XML attack is whats known as an XML Bomb. The bomb is small XML fragment that makes the data provided grow exponentially during the parsing of the document thus leading to extensive memory consumption and thus room for a denial of service attack.

Consider the following XML Bomb:
<!DOCTYPE item["
       <!ENTITY item "item">
       <!ENTITY item1 "&item;&item;&item;&item;&item;&item;">
       <!ENTITY item2 "&item1;&item1;&item1;&item1;&item1;&item1;&item1;&item1;&item1;">
       <!ENTITY item3 "&item2;&item2;&item2;&item2;&item2;&item2;&item2;&item2;&item2;">
       <!ENTITY item4 "&item3;&item3;&item3;&item3;&item3;&item3;&item3;&item3;&item3;">
       <!ENTITY item5 "&item4;&item4;&item4;&item4;&item4;&item4;&item4;&item4;&item4;">
       <!ENTITY item6 "&item5;&item5;&item5;&item5;&item5;&item5;&item5;&item5;&item5;">
       <!ENTITY item7 "&item6;&item6;&item6;&item6;&item6;&item6;&item6;&item6;&item6;">
       <!ENTITY item8 "&item7;&item7;&item7;&item7;&item7;&item7;&item7;&item7;&item7;">
      ]>
      <item>
        <description>&item8;</description>
        <price>500.0</price>
        <quantity>1</quantity>
       </item>

When attempting to Parse the above fragment, the SAX parser will stop (a feature introduced in JDK 1.4.2)
with the following error:
javax.xml.bind.UnmarshalException
 - with linked exception:
[org.xml.sax.SAXParseException: The parser has encountered more than "64,000" entity expansions in this document; this is the limit imposed by the application.]
 at javax.xml.bind.helpers.AbstractUnmarshallerImpl.createUnmarshalException(AbstractUnmarshallerImpl.java:315)
 ... 26 more
Note the fact that the parser complains about finding more than 64,000 entity expansions. The number of entity expansions is a property that can be controlled via "-DentityExpansionLimit".

A lot of the above mentioned scenarios could be reduced by ensuring XSD validation and not using DTD's. DTD's can be totally prevented as well by setting the property "http://apache.org/xml/features/disallow-doc-type-decl" to true. If set, then any XML being parsed that has a DOC Type declaration will cause a fatal parsing error.

An example demonstrating the XML exploits can be downloaded HEREE. Note that to witness the XXE injection, one would need to run the same on a *NIX system. The example provided does not upload private information and is only for demonstration purposes.

Oh well, keeping this BLOG limited in content. Have not even looked into XPath XML injection. Quite interesting, I only wonder how many Web Services are out there where Tag injection exploits can be used on them.

Links I found of value:
1. Preventing External Entity Attacks
2. Testing for XML Injection

Tuesday, April 6, 2010

BatchMessageListenerContainer using Spring and MessageProxies

A few things I wanted to share on this BLOG. First is a way to Batch consume messages using Spring's Listener Containers and the second is an experiment on using Dynamic proxies to send messages and an application of the same.

1. Batch Message Listener:
When using the Spring Listener container, one typically consumes a single message at a time. This works in most scenarios. There however might be scenarios where consuming multiple messages and doing a task in a batch might be more effecient. Clearly one way to ensure batching is to have the message itself be an aggregation of multiple payloads that one would need to process in batch. From a consumer perspective one can process this single message as an atomic unit and either consume or reject the same. Clearly this works if the producer of the messages can/will group the messages together. Further this also means that the producer needs to have understanding of the consumer needs regarding batch effeciency.

What about another way of handling this scenario, where a Batch message listener is used. In this proposal, one would receive X number of messages together as a unit and process them all or roll back all. A Consumer can now choose to capitalize on performance gains by batching the message. There is of course a cost as far as the messaging system is concerned as it would need to hold onto the messages in the batch as unprocessed as long as the JMS Transaction is active.

I found that the spring-batch project had at one time a BatchMessageListenerContainer that did some batching. I could not find the same in later versions of the framework. So I created one that does the same. It is based of the DefaultMessageListenerContainer and has the following requirements of it:
  1. Receive at most X messages and process the same as a unit of work.
  2. If X messages cannot be received before a JMS Session time out, and only X-dX messages have been received when the timeout occurs, then,  process the X-dX received messages even if it did not meet the batch size.
  3. Commit or rollback applies to the entire set of messages. What this means is if there is a bad message, in the bunch, then all messages part of the batch are rolled back.
  4. Only support JMS Transacted Sessions, not supporting User Transactions at this point. Should be easy though.

As the SessionAwareMessageListener from Spring does not have an signature that supports multiple messages, I created one called SessionAwareBatchMessageListener:
public interface SessionAwareBatchMessageListener<M extends Message>{
  /**
   * Perform a batch action with the provided list of {@code messages}.
   * 
   * @param session JMS {@code Session} that received the messages
   * @param messages List of messages to be processed as a unit of work
   * @throws JMSException JMSException thrown if there is an error performing the operation.
   */
  public void onMessages(Session session, List<M> messages) throws JMSException;
}

The BatchMessageListenerContainer that I am demonstrating is an extension of the DefaultMessageListenerContainer and allows for the newly created listener. Note that all this is possible due to beauty of the design of the Spring code that allows for extensions.

The container will receive messages until either the batch size is hit or a JMS Session timeout occurs and dispatches the same to the listener to complete the operation. The code for the same is bit verbose but shown in totality below:
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
  public static final int DEFAULT_BATCH_SIZE = 100;
  private int batchSize = DEFAULT_BATCH_SIZE;

  public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
  
  /**
   * The doReceiveAndExecute() method has to be overriden to support multiple-message receives.
   */
  @Override
  protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
    TransactionStatus status) throws JMSException {
    Connection conToClose = null;
    MessageConsumer consumerToClose = null;
    Session sessionToClose = null;

    try {
      Session sessionToUse = session;
      MessageConsumer consumerToUse = consumer;
  
      if (sessionToUse == null) {
        Connection conToUse = null;
        if (sharedConnectionEnabled()) {
          conToUse = getSharedConnection();
        }
        else {
          conToUse = createConnection();
          conToClose = conToUse;
          conToUse.start();
        }
        sessionToUse = createSession(conToUse);
        sessionToClose = sessionToUse;
      }
      
      if (consumerToUse == null) {
        consumerToUse = createListenerConsumer(sessionToUse);
        consumerToClose = consumerToUse;
      }
      
      List<Message> messages = new ArrayList<Message>();

      int count = 0;
      Message message = null;
      // Attempt to receive messages with the consumer
      do {
        message = receiveMessage(consumerToUse);
        if (message != null) {
          messages.add(message);
        }
      }
      // Exit loop if no message was received in the time out specified, or
      // if the max batch size was met
      while ((message != null) && (++count < batchSize));

      if (messages.size() > 0) {
        // Only if messages were collected, notify the listener to consume the same.
        try {
          doExecuteListener(sessionToUse, messages);
          sessionToUse.commit();
        }
        catch (Throwable ex) {
          handleListenerException(ex);
          if (ex instanceof JMSException) {
            throw (JMSException) ex;
          }
        }
        return true;
      }

      // No message was received for the period of the timeout, return false.
      noMessageReceived(invoker, sessionToUse);
      return false;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumerToClose);
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
    }
  }

  protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
    if (!isAcceptMessagesWhileStopping() && !isRunning()) {
      if (logger.isWarnEnabled()) {
        logger.warn("Rejecting received messages because of the listener container "
          + "having been stopped in the meantime: " + messages);
      }
      rollbackIfNecessary(session);
      throw new JMSException("Rejecting received messages as listener container is stopping");
    }

    @SuppressWarnings("unchecked")
    SessionAwareBatchMessageListener<Message> lsnr = (SessionAwareBatchMessageListener<Message>) getMessageListener();

    try {
      lsnr.onMessages(session, messages);
    }
    catch (JMSException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (RuntimeException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (Error err) {
      rollbackOnExceptionIfNecessary(session, err);
      throw err;
    }
  }
  
  @Override
  protected void checkMessageListener(Object messageListener) {
    if (!(messageListener instanceof SessionAwareBatchMessageListener)) {
      throw new IllegalArgumentException("Message listener needs to be of type ["
        + SessionAwareBatchMessageListener.class.getName() + "]");
    }
  }
 
  @Override
  protected void validateConfiguration() {
    if (batchSize <= 0) {
      throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
    }
  }
}
There is a demonstration example in the code attached which shows how messages can be received in batch and processed on the consumer.

2. Proxy pattern for JMS Messaging Sending:
Recently I have been seeing quite a few REST Clients that support a Proxy pattern and wanted to experiment to see whether the same can be applied to JMS as well.

The RESTful system has some similarities with JMS:
  1.  Resource location can be compared to JMS Destination
  2.  Mime type can be compared to JMS Message Type
So if we defined a couple of annotations, one for Destination JNDI and a second for MessageType, a proxy interface could look like the following:
public interface MessageSender {
    @DestinationJndi("com.welflex.barqueue")
    @MessageType(ObjectMessage.class)
    public void sendObjectMessage(String s);

    @DestinationJndi("com.welflex.baz.queue")
    @MessageType(TextMessage.class)
    public void sendStringMessage(String s);

    @DestinationJndi("com.welflex.boo.queue")
    @MessageType(MapMessage.class)
    public void sendMapMessage(Map<String, Object> map);
  }
We could then create a Proxy based of the above interface and send messages as shown below:
public void testMethod() {
    SendExecutor executor = new ActiveMQSendExecutor("vm://localhost", "foo", "bar");
    MessageSender s = ProxyFactory.create(MessageSender.class, executor);
    s.sendObjectMessage("Foo");
    s.sendStringMessage("Bar");
    
    Map<String, Object> m = new HashMap<String, Object>();

    m.put("name", "Sanjay");
    m.put("age", 23);
    m.put("salary", 12313131.213F);    

    s.sendMapMessage(m);  
}  
I am not explaining the details of how the proxy is created as the same is available in the downloadable example. One can easily swap out the ActiveMQSendExecutor with a WebLogicSendExecutor, OpenJMSSendExecutor or any other JMS Provider implementation.

So there are many questions that arise with the above:
  1. What about controlling the message properties? Well, the same applies to header properties that one would need to handle with REST Proxy clients. One can define an execution interceptor to provide the same :-)
  2. JMS is so simple; is another layer of abstraction really worth it? Well I agree, mostly, but if all a client needs to do is send a particular type of message and we can abstract away the boiler plate why not? Sure there is some initial development in creating the send executors. Once done with the same, its as easy as annotating an interface and dispatching :-)
  3. What about bytes message and other types? Have not handled the same. 
Clearly there are more questions here. The above is just one of my Frankenstein experiments that I thought I'd share :-)

3. Examples:
Examples that utilize the above code are available in the provided example. A sample batch consumer is demonstrated that persists Person entries to a HSQL database using Hibernate. The batch size of messages consumed is 200 at a time and on every hundred records, Hibernate flushes the session.

Examples of the Proxy class usage is also demonstrated for sending different types of messages.

Download a Maven Example from HERE.