Search This Blog

Friday, January 30, 2009

Spring JmsTemplate WebLogic

In an earlier blog, I had expressed my concern regarding the JmsTemplate from the Spring Framework and that it promotes a bad design pattern, i.e., where JMS artifacts are created and closed across every send/receive.

Spring's SingleConnectionFactory class pools a single connection for every template nicely and tends to alleviate the problem. However, it fails to pool Session, Producer and Consumer artifacts. As mentioned by James Strachan and others, the same can lead to performance degradation.

With the SingleConnectionFactory, Spring definitely started off on the right foot, however, the same is not enough. The other artifacts derived from javax.jms.Connection bring their own penalty and the cost of the same is probabaly different depending on the JMS Provider used.

When using WebLogic, I found that clearly the most performant way to send messages is without using the JmsTemplate, i.e., obtaining a Connection, Session, Producer/Consumer and re-using the same across multiple sends or receives. Using the JmsTemplate does impact the performance even though Spring's SingleConnectionFactory re-uses the Connection object, there seems to be a penalty that is encountered due to Session and Producer/Consumers not being re-used.

Is it is possible to re-use the Session and Producer/Consumers via the JmsTemplate ? Sure, a simple custom implementation of the Connection Factory that proxies and re-uses the Session and Producer/Consumer artifacts can lead to a performance gain.

Using the Dynamic Proxy pattern, one can create a custom ConnectionFactory that wraps the connection factory that is obtained from the JMS Provider. The custom Connection factory like Spring's SingleConnectionFactory class maintains a single JMS Connection which is re-used across every send/receive. However, unlike Spring's SingleConnectionFactory, the connection returned is a Proxy that will attempt to re-use Session objects. The Session objects created by the Connection Proxy are also Proxies and will pool Producer/Consumer objects. Doing pooling all the way down allows for re-use of created JMS Objects. I created a simple example of the same that does the pooling. Nothing too exotic, just a simple, and most likely bug filled ;-) library as shown below:

Custom Connection Factory that Creates a Single Connection:



public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
TopicConnectionFactory, DisposableBean {

private final ConnectionFactory delegate;
private Connection singleConnection;

private String clientID;
private ConnectionInvocationHandler dynamicConnectionProxy;

public SingleConnectionFactory(ConnectionFactory delegate) {
this.delegate = delegate;

}

// Create the sole connection
private synchronized Connection createSingleConnection() throws JMSException {

if (singleConnection != null) {
return singleConnection;

}

Connection conn = delegate.createConnection();

if (clientID != null) {
conn.setClientID(clientID);

}

List<Class<? extends Connection>> classes = new ArrayList<Class<? extends Connection>>(3);

classes.add(Connection.class);

if (conn instanceof QueueConnection) {

classes.add(QueueConnection.class);
}

if (conn instanceof TopicConnection) {

classes.add(TopicConnection.class);
}

dynamicConnectionProxy = new ConnectionInvocationHandler(conn);


singleConnection = (Connection) java.lang.reflect.Proxy.newProxyInstance(delegate.getClass()

.getClassLoader(), (Class[]) classes.toArray(new Class[classes.size()]),
dynamicConnectionProxy);

return singleConnection;
}

public Connection createConnection() throws JMSException {

return createSingleConnection();
}
....
.....
public void destroy() throws Exception {
synchronized (this) {

JmsUtils.closeConnection(singleConnection);
dynamicConnectionProxy.destroy();
}

}
}





Connection Invocation Handler that factories Session Proxies:



public class ConnectionInvocationHandler implements InvocationHandler, DisposableBean {

private Connection delegate;
// WSession is a custom interface that extends Session, QueueSession
// and TopicSession
private Pool<WSession> sessionPool;

public ConnectionInvocationHandler(Connection delegate) {
this.delegate = delegate;

this.sessionPool = new Pool<WSession>();
}

public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {

if (isSessionCreationMethod(method)) {

return sessionPool.take(new Creator<WSession>() {

public WSession create() {
try {
Session sessionDelegate = (Session) method.invoke(delegate, args);

if (sessionDelegate == null) {
throw new RuntimeException("Unable to Create Session Delegate");

}

WSession s = (WSession) Proxy.newProxyInstance(Session.class.getClassLoader(),
new Class[] { WSession.class }, new SessionInvocationHandler(sessionDelegate,
ConnectionInvocationHandler.this));

return s;
}
catch (Exception e) {

throw new RuntimeException("Error creating", e);
}
}

});
}
else if (method.getName().equals("close")) {

// Do not close delegate here.
return null;
}
else if (method.getName().equals("setClientID")) {

// Client ID is for the single connection only.
String currentClientId = this.delegate.getClientID();

if (currentClientId != null && currentClientId.equals(args[0])) {

return null;
}
else {
throw new javax.jms.IllegalStateException(

"setClientID method is not allowed on the proxy. Set it explicitly on the Factory instead. ");
}
}

return method.invoke(delegate, args);

}

void releaseSession(WSession session) throws Exception {

sessionPool.give(session);
}
....
....
/**
* Shut down all open Connections, Sessions, Producers and Consumers.
*/

public void destroy() throws Exception {

sessionPool.destroy();
}
}





Session Invocation Handler that Proxies Producer and Consumer Proxies:



public class SessionInvocationHandler implements InvocationHandler {

private Session delegate;
private ConnectionInvocationHandler connectionProxy;
private Pool<WSender> producerPool;

private Pool<WConsumer> consumerPool;

public SessionInvocationHandler(Session delegate, ConnectionInvocationHandler connectionProxy)

throws JMSException {
this.delegate = delegate;

this.connectionProxy = connectionProxy;
this.producerPool = new Pool<WSender>();

this.consumerPool = new Pool<WConsumer>();
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

if (isSenderMethod(method)) {
return getSender(method, args);

}
else if (isReceiverMethod(method)) {

return getConsumer(method, args);
}
else if (method.getName().equals("close")) {

// Set object's closed flag to true
try {
connectionProxy.releaseSession((WSession) proxy);

}
catch (Exception e) {
JmsUtils.closeSession(delegate);

}

return null;
} else if (method.getName().equals("destroy")) {

producerPool.destroy();
consumerPool.destroy();
JmsUtils.closeSession(delegate);

connectionProxy = null;
return null;
}

return method.invoke(delegate, args);
}

// Assume duplicate of this code for Consumer
WSender getSender(final Method method, final Object args[]) throws Throwable {

return producerPool.take(new Creator<WSender>() {
public WSender create() {

try {
MessageProducer producerDelegate = (MessageProducer) method.invoke(delegate, args);

WSender s = (WSender) Proxy.newProxyInstance(MessageProducer.class.getClassLoader(),
new Class[] { WSender.class }, new ProducerInvocationHandler(producerDelegate,
SessionInvocationHandler.this));

return s;
}
catch (Exception e) {

throw new RuntimeException("Error Creating Sender", e);
}
}

});
}
....
....

void release(WSender sender) throws Exception {

producerPool.give(sender);
}
...
}




The Production Invocation Handler (Consumer is similar):



public class ProducerInvocationHandler implements InvocationHandler {

private MessageProducer delegate;
private SessionInvocationHandler sessionProxy;

public ProducerInvocationHandler(MessageProducer delegate, SessionInvocationHandler sessionProxy)

throws JMSException {
this.delegate = delegate;

this.sessionProxy = sessionProxy;
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

if (method.getName().equals("close")) {

try {
sessionProxy.release((WSender) proxy);

} catch (Exception e) {
JmsUtils.closeMessageProducer(delegate);

}

return null;
} else if (method.getName().equals("destroy")) {
JmsUtils.closeMessageProducer(delegate);

sessionProxy = null;
return null;
}


return method.invoke(delegate, args);
}
}




A Sample Unit Test:



@Test
public void testWithoutSpring() throws JMSException {

Connection conn = connectionFactory.createConnection();
conn.start();

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer(queue);
StopWatch w = new StopWatch();

w.start();
for (int i = 0; i < MAX_MESSAGES; i++) {

producer.send(messageCreator.createMessage(session));
}

w.stop();
System.out.println("Sans Spring Jms Template:" + w.getTotalTimeMillis());

}


@Test
public void testSpringJmsTemplate() {

System.out.println("Spring Jms Template Without Single Connection Factory:" + executeTemplateTest(connectionFactory));
}


@Test
public void testSpringJmsTemplateSingleConnFactory() {
org.springframework.jms.connection.SingleConnectionFactory f
= new org.springframework.jms.connection.SingleConnectionFactory(connectionFactory);

System.out.println("Spring Jms Template with Single Connection Factory:" + executeTemplateTest(f));
}


@Test
public void testPoolingFactory() throws Exception {

SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(connectionFactory);
System.out.println("Spring JmsTemplate With Pooled Custom Factory:" + executeTemplateTest(singleConnectionFactory));

singleConnectionFactory.destroy();
}

private long executeTemplateTest(ConnectionFactory connectionFactory) {

JmsTemplate template = new JmsTemplate(connectionFactory);
template.setDefaultDestination(queue);


StopWatch watch = new StopWatch();
watch.start();

for (int i = 0 ; i < MAX_MESSAGES; i++) {

template.send(messageCreator);
}

watch.stop();

return watch.getTotalTimeMillis();
}




Each of the above tests clears the JMS Queue of all messages before starting. In addition, the cost of obtaining JMS Resources like Connection Factory and Queue are abstracted to the "before" method of the test. When I ran tests across my local WebLogic installation, I found the following results with all time measured in milli seconds:

100 Messages Sent:
Sans Spring Jms Template:109
Spring Jms Template Without Single Connection Factory:1763
Spring Jms Template with Single Connection Factory:1025
Spring JmsTemplate With Pooled Custom Factory:207

1000 Messages Sent:
Sans Spring Jms Template:1239
Spring Jms Template Without Single Connection Factory:18049
Spring Jms Template with Single Connection Factory:6135
Spring JmsTemplate With Pooled Custom Factory:2326

5000 Messages Sent:
Sans Spring Jms Template:6618
Spring Jms Template Without Single Connection Factory:52916
Spring Jms Template with Single Connection Factory:25944
Spring JmsTemplate With Pooled Custom Factory:8894

So here lies my take on using JmsTemplate with WebLogic. Try not to use it but instead obtain Connection, Session, Producer/Consumers and reuse them. If you have to use JmsTemplate and are not sending many messages, then use the SingleConnectionFactory provided by Spring in conjunction with JmsTemplate.

If many messages are being sent in succession then creating a custom factory that pools Session and Producer/Consumer objects would be the way to go IMO.

The sample code that I have does not handle idle time out for the JMS artifacts. Building the same into the proxies should be a trivial effort. In addition to the above optimization, one can easily build into the Proxies, re-connect logic for Connection and Session, Producer/Consumers. WebLogic transparent reconnect logic, in the event of failure of a Server in a Cluster, cannot always be guaranteed (look at the OTN forums).

I would be interested to know how the JmsTemplate performs with other JMS Providers. In addition, if my bearings are totally off, I would like to know that as well ;-)..An please I am talking of this Blog only ;-)

wljmshelper is the library that represents the code for the above and can be downloaded from HERE The project is a Maven 2 project and one should be able to compile the same as it references the JMS API. For compilation there are no WebLogic specifics. However for running the tests, one would require the WebLogic Client, i.e., wlfullclient.jar. Further you would need a weblogic installation :-) in order to run the tests.

Disclaimer:
The above mentioned code is an experiment. It has not undergone testing and is not even open source and is created purely for the intent of demonstration.

Saturday, January 24, 2009

Spring Jms Support

My year at Overstock.com has been just superb from a learning front. I have had the oppurtunity to work RESTful Web Services, Maven, Hibernate and in general be a part of a super architecture and development team at play.

I have also been able to contribute in the selecting of a JMS provider/architecture for reliable messaging at Overstock. The provider that we have selected is Oracle WebLogic.

As we enter the world of WebLogic JMS, I find that there are items that are best abstracted away such that developers of an application can easily utilize the provider without having to repeat code. Spring's JMS support feels ideal for the same.

Obtaining JMS Artifacts from JNDI:



Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, "t3://localhost:8000,localhost:8002");
p.put(Context.SECURITY_PRINCIPAL, "weblogic");
p.put(Context.SECURITY_CREDENTIALS, "weblogic");

JndiTemplate template = new JndiTemplate(p);

QueueConnectionFactory connFactory = (QueueConnectionFactory)
template.execute(new JndiCallBack() {
public Object doInContext(Context ctx) {
return (QueueConnectionFactory) ctx.lookup("connectionFactory");
}
});
....




JmsTemplate:
Spring Framework provides a Template class called JmsTemplate much like HibernateTemplate for Hibernate DAO. The class provides many convinience methods for sending recieving messages by providing an abstraction layer over the JMS API. There are methods such as send/receive or methods that accept Call Backs. I like some of the safety these methods provide where Connections, Sessions, Producers and Consumers are gracefully closed after use of them. WebLogic documentation explicitly states that Connection objects must be closed after using them.

That said, I also fear that this class is very open for misuse. James Strachan has mentioned some of the PIT falls that one needs to be aware of when using JmsTemplate. The primary concern that surfaces with the JmsTemplate creating a Connection/Session/Producer or Consumer across every send/receive/callback. This can easily be misused and in the case of WebLogic, attempting to use the template with the default Connection Factory degrades the performance of operations. I fear that directly using the JmsTemplate can lead to code like:



jmsTemplate.send(new MessageCreator() {
public Message send(Session session) throws JMSException {
return session.createTextMessage("Order Submited");
}
});

... do some stuff..
jmsTemplate.send(new MessageCreator() {
public Message send(Session session) throws JMSException {
return session.createTextMessage("Order completed");
}
});



For each of the send operations above, a Connection, Session and Producer will be created. If the above sequence is being called many times over, this would present a performance problem.

With CallBacks like Connection/Session or Producer, re-use has more of chance as within a call back one could send multiple messages thus re-using the jms objects. However, if these operations are many, than these too can suffer misuse and cause performance degradation. JmsTemplate works I guess when you send a message once in a while.

James observation regarding the use of JmsTemplate is very astute and using JmsTemplate might not be best of options unless "Pooling" is introduced into the mix.

Spring Message Container:
Spring offers support for Message Driven POJOs (MDP) in the form of MessageListenerContainers. The Container is responsible for retrieving a message from a JMS destination and then notifying the listener associated with the Container. One nice feature of the Container is that it provides for re-connect logic in event of failure of a server in a Cluster by allowing transparent reconnect to different server in the cluster. Look at this POSTING about a Reconnecting Message Listener. The Spring Message ListenerContainer's tend to Pool Jms artifacts as well. In particular the DefaultMessageListenerContainer provides XA support as well. Using a Container to receive messages is as simple as:




SimpleMessageContainer container = new SimpleMessageContainer();
container.setConnectionFactory(connectionFactory);
container.setDestination(queue);
// Set how many concurrent consumers
container.setConcurrentConsumers(10);
consumer.setMessageListener(new javax.jms.MessageListener() {
public void onMessage(Message m) {
..do some stuff...
}
});
container.start();




Message Converters:
I feel that the SimpleMessageConverter class has some nice convinience methods for converting from POJO's to JMS Messages and vice versa. Message Converters are IMO a great concept for message translation. No whining here.

All in all, I think using Spring's MessageContainer is advisable considering it has all the logic for reconnect, multi-threading, XA etc all build in. Using the Converters has benefit and the JndiTemplate serves to abstract away boiler plate code. However, using Jms Template needs to be done with great care with Pooling almost a must. Jenks might be an option.