Search This Blog

Friday, April 24, 2009

Browsing Weblogic JMS Distrubuted Queues

On of the interesting features of Weblogic is that when javax.jms.QueueBrowsers are created, they are pinned to a particular physical member of a distributed destination. What that means is that a QueueBrowser created for a distributed destination will only be able to browse messages on the member it gets pinned to and not the whole queue.

One can try to create X number of queue browsers based of the number of servers in the provider URL, for example, "t3://localhost:8002, localhost:8003", and hope that no two queue browsers will pin to the same destination, i.e., at the mercy of Weblogic round robin if possible. Even if we do manage to do that, one still needs to iterate over X separate enumerations in order to manage the message. Consider an integration test where one sends a message to a distributed destination and would have to create X queue browsers to determine whether or not the message is present.

What would be really nice is if the Weblogic runtime gave us a QueueBrowser that provided an enumeration of enumerations of the physical members of a distributed queue. That however is not present. So, I decided to create a simple example that would help me with the same. At the same time, I am interested in understanding the JMX capabilities of Weblogic.

The example will create a QueueBrowser and obtain an enumeration from each of the physical members of the distributed destination and provide back an enumeration of the respective enumerations. In the case of uniformly created distributed destinations, the JNDI name of the physical members will be of the format JMSServerName@JNDIName. If one knows the JMSServerName thats great. However, one should not have to rely on the same as JMSServers might be added/removed. It would be nice if the distributed member JNDI names could be located on the fly by using Weblogic's JMX capabilities and thats what I intend to do. Note that the enumerations obtained in Weblogic are snapshots of the current state of the queue and are not dynamic, i.e., new messages added being dynamically reflected in the enumeration. Finally, I am providing a java.util.Enumeration for the JMS messages instead of an java.util.Iterator as Enumeration does not have a "remove" method and I hate putting an "UnsupportedOperationException" in the Iterator and also I would like to be true to the JMS API.

So first obtaining the JNDI name of physical members of a distributed queue. Weblogic JMX is a tree for objects. Once we obtain the ServerRuntimeMBean information, we can obtain information about the JMSRuntimeMBeans and then the different JMSServers and finally the JNDI names of the JMS Servers as shown below:



public class WeblogicMBeanHelper {
private final MBeanServerConnection connection;

private final JMXConnector connector;

private final ObjectName service;

public WeblogicMBeanHelper(String url, String userName, String password) {

try {
service = new ObjectName(
"com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean");

}
catch (MalformedObjectNameException e) {
throw new AssertionError(e.getMessage());

}
......
......
JMXServiceURL serviceURL;

try {
serviceURL = new JMXServiceURL("t3", hostName, Integer.valueOf(port), jndiroot + mserver);

Hashtable<String, String> h = new Hashtable<String, String>();

h.put(Context.SECURITY_PRINCIPAL, userName);
h.put(Context.SECURITY_CREDENTIALS, password);

h.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "weblogic.management.remote");
connector = JMXConnectorFactory.connect(serviceURL, h);

connection = connector.getMBeanServerConnection();
}
catch (Exception e) {

throw new RuntimeException(e);
}
}

public Iterable<String> getDistributedMemberJndiNames(String distributedDestJndiName) {
Iterable<String> serverNames = getJmsServerNames();

Set<String> distributedDestNames = new TreeSet<String>();

for (String serverName : serverNames) {
distributedDestNames.add(serverName + "@" + distributedDestJndiName);

}

return distributedDestNames;
}

public Iterable<String> getJmsServerNames() {

.....
}

public Iterable<ObjectName> getJMSServers() {

....
}

public Iterable<ObjectName> getJMSRuntimes() {

....
}

public List<ObjectName> getServerRuntimeMBeans() {

try {
return Arrays.asList((ObjectName[]) connection.getAttribute(service, "ServerRuntimes"));

}
catch (Exception e) {
throw new RuntimeException("Error obtaining Server Runtime Information", e);

}
}

public void close() {

// Close the connector
....
}
}





public class WeblogicMBeanHelper {
private final MBeanServerConnection connection;

private final JMXConnector connector;

private final ObjectName service;

public WeblogicMBeanHelper(String url, String userName, String password) {

try {
service = new ObjectName(
"com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean");

}
catch (MalformedObjectNameException e) {
throw new AssertionError(e.getMessage());

}
......
......
JMXServiceURL serviceURL;

try {
serviceURL = new JMXServiceURL("t3", hostName, Integer.valueOf(port), jndiroot + mserver);

Hashtable<String, String> h = new Hashtable<String, String>();

h.put(Context.SECURITY_PRINCIPAL, userName);
h.put(Context.SECURITY_CREDENTIALS, password);

h.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "weblogic.management.remote");
connector = JMXConnectorFactory.connect(serviceURL, h);

connection = connector.getMBeanServerConnection();
}
catch (Exception e) {

throw new RuntimeException(e);
}
}

public Iterable<String> getDistributedMemberJndiNames(String distributedDestJndiName) {
Iterable<String> serverNames = getJmsServerNames();

Set<String> distributedDestNames = new TreeSet<String>();

for (String serverName : serverNames) {
distributedDestNames.add(serverName + "@" + distributedDestJndiName);

}

return distributedDestNames;
}

public Iterable<String> getJmsServerNames() {

.....
}

public Iterable<ObjectName> getJMSServers() {

....
}

public Iterable<ObjectName> getJMSRuntimes() {

....
}

public List<ObjectName> getServerRuntimeMBeans() {

try {
return Arrays.asList((ObjectName[]) connection.getAttribute(service, "ServerRuntimes"));

}
catch (Exception e) {
throw new RuntimeException("Error obtaining Server Runtime Information", e);

}
}

public void close() {

// Close the connector
....
}
}




Now that we have a way of obtaining the distributed members, the following represents the QueueBrowser that enumerates over the different members and consolidating the same into a single enumeration. I have a second enumeration that provides information regarding the JMServer that hosts the given message:



public class DistribuedQueueBrowser {
...
private final Iterable<String> queueNames;

public DistribuedQueueBrowser(String adminUrl, String providerUrl,
String distributedDestinationName, String userName, String password) throws Exception {

ctx = getInitialContext(providerUrl, userName, password);
WeblogicMBeanHelper helper = null;

try {
helper = new WeblogicMBeanHelper(adminUrl, userName, password);

queueNames = helper.getDistributedMemberJndiNames(distributedDestinationName);
}
finally {

if (helper != null) {
helper.close();

}
}

// Set up connection/session..
....
}

private InitialContext getInitialContext(String providerUrl, String userName, String password) throws Exception {

....
}

// Retrun an Enumeration of Messages only
public Enumeration<Message> getEnumeration() throws JMSException, NamingException {

return new JmsMessageEnumeration(getMessageEnumeratorMap());
}

@SuppressWarnings("unchecked") private Map<String, Enumeration<Message>> getMessageEnumeratorMap() throws JMSException,
NamingException {

Map<String, Enumeration<Message>> serverMessageMap = new HashMap<String, Enumeration<Message>>();

for (String queueName : queueNames) {
String serverDq[] = StringUtils.split(queueName, "@");

Queue queue = (Queue) ctx.lookup(queueName);

javax.jms.QueueBrowser qb = session.createBrowser(queue);

serverMessageMap.put(serverDq[0], qb.getEnumeration());

}

return serverMessageMap;
}

// Return an enumeration of ServerLocatedMessage that contains the
// the jms server that houses the message

public Enumeration<ServerLocatedMessage> getServerLocatedEnumeration() throws JMSException,
NamingException {

return new ServerLocatedMessageEnumeration(getMessageEnumeratorMap());
}

public static class ServerLocatedMessage {
private final Message message;

private final String jmsServerName;

public ServerLocatedMessage(String jmsServerName, Message message) {

this.message = message;
this.jmsServerName = jmsServerName;

}
...
}

private static abstract class AbstractMessageEnumeration<T> implements Enumeration<T> {

Map.Entry<String, Enumeration<Message>> current;
private Enumeration<Message> currMessageEnumer;

private final Iterator<Map.Entry<String, Enumeration<Message>>> iterator;

public AbstractMessageEnumeration(Map<String, Enumeration<Message>> map) {

iterator = map.entrySet().iterator();
current = iterator.hasNext()

? iterator.next()
: null;
currMessageEnumer = current != null

? current.getValue()
: new Enumeration<Message>() {

public boolean hasMoreElements() {
return false;

}

public Message nextElement() {
throw new NoSuchElementException();

}
};
}

Enumeration<Message> getEnumeration() {

if (current == null || currMessageEnumer.hasMoreElements()) {

return currMessageEnumer;
}

while (iterator.hasNext()) {

current = iterator.next();
currMessageEnumer = current.getValue();

if (currMessageEnumer.hasMoreElements()) {
return currMessageEnumer;

}
}

return currMessageEnumer;
}

public boolean hasMoreElements() {
return getEnumeration().hasMoreElements();

}
}

// Wraps the JMS Message within a ServerLocatedMessage object
// which contains the server name along with the message
private static class ServerLocatedMessageEnumeration extends

AbstractMessageEnumeration<ServerLocatedMessage> {

public ServerLocatedMessageEnumeration(Map<String, Enumeration<Message>> map) {

super(map);
}

public ServerLocatedMessage nextElement() {

Message message = getEnumeration().nextElement();
return new ServerLocatedMessage(current.getKey(), message);

}
}

private static class JmsMessageEnumeration extends AbstractMessageEnumeration<Message> {

public JmsMessageEnumeration(Map<String, Enumeration<Message>> map) {
super(map);

}

public Message nextElement() {
return getEnumeration().nextElement();

}
}

public void close() {

......
}
}



Using this QueueBrowser is rather straight forward and can be done via:



// Note that the first argument is the admin url and the second is the
// managed server url.
DistribuedQueueBrowser qb = new DistribuedQueueBrowser("t3://localhost:7001",
"t3://localhost:8001",
"test_queue", "weblogic", "weblogic");

Enumeration<Message> i = qb.getEnumeration();
while (i.hasMoreElements()) {

Message m = i.nextElement();
System.out.println("Message:" + m);

}

Enumeration<ServerLocatedMessage> sli = qb.getServerLocatedEnumeration();

while (sli.hasMoreElements()) {
ServerLocatedMessage m = sli.nextElement();

System.out.println(m);
}
}



The code for the same is available HERE. The zip file contains only the JAVA files. One would need to compile the same with weblogic's client jar's and then use them. Let me know if you have problems running the code or if you have any tips regarding the code.
Later...

3 comments:

Anonymous said...

Sanjay thanks for the informative post. I was looking for a good way to browse a distributed queue and happy to find your description.

Steve

Ladislav Jech said...

Great piece of code, I am just creating some testing routines, where I need to stop OSB proxy service (which is already working) which is consuming the queue and then post message to the queue and browse the queue for this message as part of the test. As soon as pre-prod and uat environments are using distributed queues, this helped me a lot. Thanks!
Ladislav

Anonymous said...

Thank you so much for posting this, it's been really helpful for me. Thanks.
Javier