Spring's SingleConnectionFactory class pools a single connection for every template nicely and tends to alleviate the problem. However, it fails to pool Session, Producer and Consumer artifacts. As mentioned by James Strachan and others, the same can lead to performance degradation.
With the SingleConnectionFactory, Spring definitely started off on the right foot, however, the same is not enough. The other artifacts derived from javax.jms.Connection bring their own penalty and the cost of the same is probabaly different depending on the JMS Provider used.
When using WebLogic, I found that clearly the most performant way to send messages is without using the JmsTemplate, i.e., obtaining a Connection, Session, Producer/Consumer and re-using the same across multiple sends or receives. Using the JmsTemplate does impact the performance even though Spring's SingleConnectionFactory re-uses the Connection object, there seems to be a penalty that is encountered due to Session and Producer/Consumers not being re-used.
Is it is possible to re-use the Session and Producer/Consumers via the JmsTemplate ? Sure, a simple custom implementation of the Connection Factory that proxies and re-uses the Session and Producer/Consumer artifacts can lead to a performance gain.
Using the Dynamic Proxy pattern, one can create a custom ConnectionFactory that wraps the connection factory that is obtained from the JMS Provider. The custom Connection factory like Spring's SingleConnectionFactory class maintains a single JMS Connection which is re-used across every send/receive. However, unlike Spring's SingleConnectionFactory, the connection returned is a Proxy that will attempt to re-use Session objects. The Session objects created by the Connection Proxy are also Proxies and will pool Producer/Consumer objects. Doing pooling all the way down allows for re-use of created JMS Objects. I created a simple example of the same that does the pooling. Nothing too exotic, just a simple, and most likely bug filled ;-) library as shown below:
Custom Connection Factory that Creates a Single Connection:
public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
TopicConnectionFactory, DisposableBean {
private final ConnectionFactory delegate;
private Connection singleConnection;
private String clientID;
private ConnectionInvocationHandler dynamicConnectionProxy;
public SingleConnectionFactory(ConnectionFactory delegate) {
this.delegate = delegate;
}
// Create the sole connection
private synchronized Connection createSingleConnection() throws JMSException {
if (singleConnection != null) {
return singleConnection;
}
Connection conn = delegate.createConnection();
if (clientID != null) {
conn.setClientID(clientID);
}
List<Class<? extends Connection>> classes = new ArrayList<Class<? extends Connection>>(3);
classes.add(Connection.class);
if (conn instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (conn instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
dynamicConnectionProxy = new ConnectionInvocationHandler(conn);
singleConnection = (Connection) java.lang.reflect.Proxy.newProxyInstance(delegate.getClass()
.getClassLoader(), (Class[]) classes.toArray(new Class[classes.size()]),
dynamicConnectionProxy);
return singleConnection;
}
public Connection createConnection() throws JMSException {
return createSingleConnection();
}
....
.....
public void destroy() throws Exception {
synchronized (this) {
JmsUtils.closeConnection(singleConnection);
dynamicConnectionProxy.destroy();
}
}
}
Connection Invocation Handler that factories Session Proxies:
public class ConnectionInvocationHandler implements InvocationHandler, DisposableBean {
private Connection delegate;
// WSession is a custom interface that extends Session, QueueSession
// and TopicSession
private Pool<WSession> sessionPool;
public ConnectionInvocationHandler(Connection delegate) {
this.delegate = delegate;
this.sessionPool = new Pool<WSession>();
}
public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
if (isSessionCreationMethod(method)) {
return sessionPool.take(new Creator<WSession>() {
public WSession create() {
try {
Session sessionDelegate = (Session) method.invoke(delegate, args);
if (sessionDelegate == null) {
throw new RuntimeException("Unable to Create Session Delegate");
}
WSession s = (WSession) Proxy.newProxyInstance(Session.class.getClassLoader(),
new Class[] { WSession.class }, new SessionInvocationHandler(sessionDelegate,
ConnectionInvocationHandler.this));
return s;
}
catch (Exception e) {
throw new RuntimeException("Error creating", e);
}
}
});
}
else if (method.getName().equals("close")) {
// Do not close delegate here.
return null;
}
else if (method.getName().equals("setClientID")) {
// Client ID is for the single connection only.
String currentClientId = this.delegate.getClientID();
if (currentClientId != null && currentClientId.equals(args[0])) {
return null;
}
else {
throw new javax.jms.IllegalStateException(
"setClientID method is not allowed on the proxy. Set it explicitly on the Factory instead. ");
}
}
return method.invoke(delegate, args);
}
void releaseSession(WSession session) throws Exception {
sessionPool.give(session);
}
....
....
/**
* Shut down all open Connections, Sessions, Producers and Consumers.
*/
public void destroy() throws Exception {
sessionPool.destroy();
}
}
Session Invocation Handler that Proxies Producer and Consumer Proxies:
public class SessionInvocationHandler implements InvocationHandler {
private Session delegate;
private ConnectionInvocationHandler connectionProxy;
private Pool<WSender> producerPool;
private Pool<WConsumer> consumerPool;
public SessionInvocationHandler(Session delegate, ConnectionInvocationHandler connectionProxy)
throws JMSException {
this.delegate = delegate;
this.connectionProxy = connectionProxy;
this.producerPool = new Pool<WSender>();
this.consumerPool = new Pool<WConsumer>();
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (isSenderMethod(method)) {
return getSender(method, args);
}
else if (isReceiverMethod(method)) {
return getConsumer(method, args);
}
else if (method.getName().equals("close")) {
// Set object's closed flag to true
try {
connectionProxy.releaseSession((WSession) proxy);
}
catch (Exception e) {
JmsUtils.closeSession(delegate);
}
return null;
} else if (method.getName().equals("destroy")) {
producerPool.destroy();
consumerPool.destroy();
JmsUtils.closeSession(delegate);
connectionProxy = null;
return null;
}
return method.invoke(delegate, args);
}
// Assume duplicate of this code for Consumer
WSender getSender(final Method method, final Object args[]) throws Throwable {
return producerPool.take(new Creator<WSender>() {
public WSender create() {
try {
MessageProducer producerDelegate = (MessageProducer) method.invoke(delegate, args);
WSender s = (WSender) Proxy.newProxyInstance(MessageProducer.class.getClassLoader(),
new Class[] { WSender.class }, new ProducerInvocationHandler(producerDelegate,
SessionInvocationHandler.this));
return s;
}
catch (Exception e) {
throw new RuntimeException("Error Creating Sender", e);
}
}
});
}
....
....
void release(WSender sender) throws Exception {
producerPool.give(sender);
}
...
}
The Production Invocation Handler (Consumer is similar):
public class ProducerInvocationHandler implements InvocationHandler {
private MessageProducer delegate;
private SessionInvocationHandler sessionProxy;
public ProducerInvocationHandler(MessageProducer delegate, SessionInvocationHandler sessionProxy)
throws JMSException {
this.delegate = delegate;
this.sessionProxy = sessionProxy;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("close")) {
try {
sessionProxy.release((WSender) proxy);
} catch (Exception e) {
JmsUtils.closeMessageProducer(delegate);
}
return null;
} else if (method.getName().equals("destroy")) {
JmsUtils.closeMessageProducer(delegate);
sessionProxy = null;
return null;
}
return method.invoke(delegate, args);
}
}
A Sample Unit Test:
@Test
public void testWithoutSpring() throws JMSException {
Connection conn = connectionFactory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
StopWatch w = new StopWatch();
w.start();
for (int i = 0; i < MAX_MESSAGES; i++) {
producer.send(messageCreator.createMessage(session));
}
w.stop();
System.out.println("Sans Spring Jms Template:" + w.getTotalTimeMillis());
}
@Test
public void testSpringJmsTemplate() {
System.out.println("Spring Jms Template Without Single Connection Factory:" + executeTemplateTest(connectionFactory));
}
@Test
public void testSpringJmsTemplateSingleConnFactory() {
org.springframework.jms.connection.SingleConnectionFactory f
= new org.springframework.jms.connection.SingleConnectionFactory(connectionFactory);
System.out.println("Spring Jms Template with Single Connection Factory:" + executeTemplateTest(f));
}
@Test
public void testPoolingFactory() throws Exception {
SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(connectionFactory);
System.out.println("Spring JmsTemplate With Pooled Custom Factory:" + executeTemplateTest(singleConnectionFactory));
singleConnectionFactory.destroy();
}
private long executeTemplateTest(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate(connectionFactory);
template.setDefaultDestination(queue);
StopWatch watch = new StopWatch();
watch.start();
for (int i = 0 ; i < MAX_MESSAGES; i++) {
template.send(messageCreator);
}
watch.stop();
return watch.getTotalTimeMillis();
}
Each of the above tests clears the JMS Queue of all messages before starting. In addition, the cost of obtaining JMS Resources like Connection Factory and Queue are abstracted to the "before" method of the test. When I ran tests across my local WebLogic installation, I found the following results with all time measured in milli seconds:
100 Messages Sent:
Sans Spring Jms Template:109
Spring Jms Template Without Single Connection Factory:1763
Spring Jms Template with Single Connection Factory:1025
Spring JmsTemplate With Pooled Custom Factory:207
1000 Messages Sent:
Sans Spring Jms Template:1239
Spring Jms Template Without Single Connection Factory:18049
Spring Jms Template with Single Connection Factory:6135
Spring JmsTemplate With Pooled Custom Factory:2326
5000 Messages Sent:
Sans Spring Jms Template:6618
Spring Jms Template Without Single Connection Factory:52916
Spring Jms Template with Single Connection Factory:25944
Spring JmsTemplate With Pooled Custom Factory:8894
So here lies my take on using JmsTemplate with WebLogic. Try not to use it but instead obtain Connection, Session, Producer/Consumers and reuse them. If you have to use JmsTemplate and are not sending many messages, then use the SingleConnectionFactory provided by Spring in conjunction with JmsTemplate.
If many messages are being sent in succession then creating a custom factory that pools Session and Producer/Consumer objects would be the way to go IMO.
The sample code that I have does not handle idle time out for the JMS artifacts. Building the same into the proxies should be a trivial effort. In addition to the above optimization, one can easily build into the Proxies, re-connect logic for Connection and Session, Producer/Consumers. WebLogic transparent reconnect logic, in the event of failure of a Server in a Cluster, cannot always be guaranteed (look at the OTN forums).
I would be interested to know how the JmsTemplate performs with other JMS Providers. In addition, if my bearings are totally off, I would like to know that as well ;-)..An please I am talking of this Blog only ;-)
wljmshelper is the library that represents the code for the above and can be downloaded from HERE The project is a Maven 2 project and one should be able to compile the same as it references the JMS API. For compilation there are no WebLogic specifics. However for running the tests, one would require the WebLogic Client, i.e., wlfullclient.jar. Further you would need a weblogic installation :-) in order to run the tests.
Disclaimer:
The above mentioned code is an experiment. It has not undergone testing and is not even open source and is created purely for the intent of demonstration.