Search This Blog

Wednesday, December 28, 2011

WebLogic JMS Partitioned Distributed Topics and Shared Subscriptions

I had previously blogged about the challenges of scaling topics with durable subscribers. In JMS, with topics, one can typically have a single topic subscriber for a particular [Client ID, Subscription Name] tuple.

In the example shown below the Order Topic has two durable subscribers, the Order Processor and the Auditor. Both of these get the message sent to the topic but at any give time, there can be exactly one of each consuming the message. There cannot be completing durable subscriptions as in the case of queues. The same translates to a scalability and availability challenge:
Oracle WebLogic has introduced the concept of a Partitioned distributed topic and shared connection subscriptions that will allow for more than one durable to subscription to be made available for a particular subscriber. In the figure shown below, with a shared subscription, one can have consumers of an "application" compete for messages of a Topic and this feature promotes scalability and availability. In the figure shown below there are two Order Processors which are competing for messages and so are the two Auditors. At no given time will a message be delivered to both the consumers of the same "application":

WebLogic introduces the concept of a UNRESTRICTED Client ID policy. With this setting on a topic, more than one connection in a cluster can share the same Client ID. The standard option of RESTRICTED enforces that only a single connection with a particular Client ID can exist in a cluster.

Sharing Subscriptions:
There are two policies for Subscription sharing:

1. Exclusive - This is the default and all subscribers created using the connection factory cannot share subscriptions with any other subscribers.

2. Sharable - Subscribers created using this connection factory can share their subscriptions with other subscribers. Consumers can share a durable subscription if they have the same [Client ID, Client ID policy and subscription name].

To promote HA, set the Subscription on the Topic as Shareable.

When creating a Topic in Weblogic set the Forwarding Policy to be Partitioned. This causes a message sent to a partitioned distributed topic to be sent to a single physical member. In addition, the message will not be forwarded to other members of the cluster if there are no consumers in the current physical member.

If we want a HA solution, then the listeners will need to connect to each and every physical member across the cluster. Consider the following figure, on Consumer Machine 1, there are two consumers of Type A and two Consumers of Type B, the same applies to the Consumer Machine 2. It is important to note that consumers of a type or application connect to both the physical members of the cluster. Doing so ensures that in the event a consumer machine dies unexpectedly, the other consumer machine can still continue to function ensuring availability:


The above can be achieved using Spring Framework's Message Listener Container with some wrapper code as shown below where the PartitionedTopicContainer is a container of containers connecting to each physical member of the topic with the same client ID and subscription name:
public class PartitionedTopicContainer {
  private final List<DefaultMessageListenerContainer> containers;

  public PartitionedTopicContainer(String clientID, String subscriptionName, ConnectionFactory connectionFactory, Destination ...physicalTopicMembers) {
    this.containers = Lists.newArrayList();

    for (Destination physicalMember : physicalTopicMembers) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

      container.setConnectionFactory(connectionFactory);
      container.setDestination(physicalMember);
      container.setClientId(clientID);
      container.setDurableSubscriptionName(subscriptionName);
    }
  }

  public void setMessageListener(Object listener) {
    for (DefaultMessageListenerContainer container : containers) {
      container.setMessageListener(listener);
    }
  }

  public void start() {
    for (DefaultMessageListenerContainer container : containers) {
      container.start();
    }
  }

  public void shutdown() {
    for (DefaultMessageListenerContainer container : containers) {
      container.shutdown();
    }
  }
}
The above container could then be used as follows:
// Obtain each physical member of the partitioned distributed topic
Destination physicalMember1 = (Destination) context.lookup("Server1@orderTopic");
Destination physicalMember2 = (Destination) context.lookup("Server2@orderTopic");

// Container for the Order Processor
PartitionedTopicContainer subscriber1 = new PartitionedTopicContainer("orderConnectionId", "orderProcSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber1.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber order processor got a message...");
    ...
  }
});
// Container for the Auditor
PartitionedTopicContainer subscriber2 = new PartitionedTopicContainer("auditorConnectionId", "auditorSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber2.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber auditor got a message...");
    ...
  }
});

subscriber1.start();
subscriber2.start();


The parts of the code above where a consumer of the API has to look up each and every physical member and provide the same to  the container is a lot of boiler plate and does not account well for cases when a physical member becomes available/unavailable. Luckily, WebLogic provides the JmsDestinationAvailabilityHelper API which is a way to listen to events relating to physical  member availability and unavailability. The PartitionedTopicContainer shown above could easily be augmented with the availability helper API and get notified of physical destination availability and unavailability to correspondingly start and stop the internal container to the physical destination. Psuedo-code of how this can be achieved with the above container is shown below:

public class PartitionedTopicContainer implements DestinationAvailabilityListener {
  private final String partDistTopicJndi;

  private final ConnectionFactory connectionFactory;
   
  @GuardedBy("containerLock")
  private final Map<String, DefaultMessageListenerContainer> containerMap;

  private final Object containerLock = new Object();

  // WebLogic Handle
  private RegistrationHandle registrationHandle;

  private final CountdownLatch startLatch = new CountdownLatch(1);

  public PartitionedTopicContainer(String clientID, String subscriptionName, String clusterUrl,
    ConnectionFactory connectionFactory, String partDistTopicJndi) {
    this.clusterUrl = clusterUrl;
    this.clientID = clientID;
    this.subscriptionName = subscriptionName;
    this.partDistTopicJndi = partDistTopicJndi;
    this.containerMap = Maps.newHashMap();
    this.connectionFactory = connectionFactory;
  }

  public void start() throws InterruptedException {
    Hashtable<String, String> jndiProperties = ...;
    jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
    jndiProperties.put(Context.PROVIDER_URL, clusterUrl);
    
    JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
   
    // Register this container as a listener for destination events
    registrationHandle = dah.register(jndiProperties, partDistTopicJndi, this);
   
    // Wait for listener notification to start container
    startLatch.await();
  }

  @Override
  public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
    synchronized (containerLock) {
      // For all Physical destinations, start a container
      for (DestinationDetail detail : physicalAvailableMembers) {
        Destination physicalMember = lookupPhysicalTopic(detail.getJNDIName());
        DefaultMessageListener container = new DefaultMessageListenerContainer();
        
        container.setConnectionFactory(connectionFactory);
        container.setDestination(physicalMember);
        container.setClientId(clientID);
        container.setDurableSubscriptionName(subscriptionName);
        System.out.println("Starting Container for physical Destination:" + detail);
        container.start();
        containerMap.put(detail.getJNDIName(), container);
      }
    }
    startLatch.countdown();       
  }
  
  @Override
  public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
    synchronized (containerLock) {
      // Shutdown all containers whose physical members are no longer available
      for (DestinationDetail detail : physicalUnavailableMembers) {
        DefaultMessageListenerContainer container = containerMap.remove(detail.getJNDIName());
        container.shutdown();
      }
    }
  }

  @Override
  public void onFailure(String destJndiName, Exception exception) {
    // Looks like a cluster wide failure
    shutdown();
  }

  public void shutdown() {
    // Unregister for events about destination availability
    registrationHandler.unregister();

    // Shut down containers
    synchronized (containerLock) {
       for (Iterator<Map.Entry<String, DefaultMessageListenerContainer>> i = containerMap.entrySet().iterator(); i
          .hasNext();) {
        Map.Entry<String, DefaultMessageListenerContainer> entry = i.next();
        System.out.println("Shutting down container for:" + entry.getKey());
        entry.getValue().shutdown();
        i.remove();
      }
    }  
  }
}   

Some of the things to remember when creating a partitioned distributed topic and shared subscription:

1. Connection Factory being used should have "Subscription Sharing Policy" set as "Shareable"
2. Forwarding policy on the Partitioned Distributed Topic should be set as "Partitioned"
3. Message forwarding will not occur, so subscribers must ensure connections exist to every physical member else messages can pile up for the subscription on that topic
4. If a server hosting a physical member is unavailable then messages from that physical topic will be unavailable until server is made available.

Partitioned Distributed Topics and Shared Subscriptions looks promising. One thing I need to sort out is how does one handle error destinations on a per subscription level with WebLogic. Any passer by with thoughts, please do shoot it my way.

Tuesday, October 4, 2011

Jersey JAX-RS and JAXB Schema Validation

This BLOG is about Jersey web services and Schema based validation with the same. I have also been playing with Google Guice (I hear it, Spring Traitor, err not quite) :-) and figured I'd use Guice instead of Spring for once.

When un-marshalling XML using JAXB, schema based validation facilitates stricter validation. The Jersey recommended approach to enable Schema based validation is to create a javax.ws.rs.ext.ContextResolver. I have seen examples of using a javax.ws.rs.ext.MessageBodyReader as well. The code demonstrated is largely based and influenced by the discussion on XSD validation between Andrew Cole and Paul Sandoz. The goals of the resolver are:
  1. Enable schema based validation if desired
  2. Provide the ability to enable a custom validation event handler
  3. Enable formatted JAXB and the ability to set the character encoding
It is said that JAXB contexts are better of cached as they are expensive to create. I am only going by what I have read in different posts and/or discussions and do not have any metrics to claim the same. A generic JAXB Context resolver is shown here that accomplishes the above:
public class JaxbContextResolver implements ContextResolver&<JAXBContext> {
  static final ConcurrentMap<String, JAXBContext> CONTEXT_MAP = new MapMaker()
      .makeComputingMap(new Function<String, JAXBContext>() {

        @Override
        public JAXBContext apply(String fromPackage) {
          try {
            return JAXBContext.newInstance(fromPackage);
          }
          catch (JAXBException e) {
            throw new RuntimeException("Unable to create JAXBContext for Package:" + fromPackage, e);
          }
        }
      });
  
  private Schema schema;
  private ValidationEventHandler validationEventHandler;
  
  public JaxbContextResolver withSchema(Schema schema) {
    this.schema = schema;
    return this;
  }
  ...
  public JaxbContextResolver withValidationEventHandler(ValidationEventHandler validationEventHandler) {
    this.validationEventHandler = validationEventHandler;
    return this;
  }
  
  @Override
  public JAXBContext getContext(Class<?> type) {
    return new ValidatingJAXBContext(CONTEXT_MAP.get(type.getPackage().getName()),
      schema, formattedOutput, encoding, validationEventHandler);
  }
   ...
  public static class ValidatingJAXBContext extends JAXBContext {
    private final JAXBContext contextDelegate;
     ....
    private final ValidationEventHandler validationEventHandler;

    @Override
    public javax.xml.bind.Unmarshaller createUnmarshaller() throws JAXBException {
      javax.xml.bind.Unmarshaller unmarshaller = contextDelegate.createUnmarshaller();
      
      // Set the Validation Handler
      if (validationEventHandler != null) {
        unmarshaller.setEventHandler(validationEventHandler);
      }
      
      // Set the Schema
      if (validatingSchema != null) {
        unmarshaller.setSchema(validatingSchema);
      }

      return unmarshaller;
    }

    @Override
    public javax.xml.bind.Marshaller createMarshaller() throws JAXBException {
      javax.xml.bind.Marshaller m = contextDelegate.createMarshaller();
      m.setProperty("jaxb.formatted.output", formattedOutput);

      if (encoding != null) {
        m.setProperty("jaxb.encoding", encoding);
      }

      return m;
    }

    @Override
    public Validator createValidator() throws JAXBException {
      return contextDelegate.createValidator();
    }
   }
 }
The Context resolver itself is registered as a Singleton with Jersey in an Application class. For example:
public class NotesApplication extends Application {
  private Set<Object> singletons;
  
  public NotesApplication() {
   ....
    Schema schema = null;
   
    try {
      schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI).newSchema(
        getClass().getResource("/notes.xsd"));
    }
    catch (SAXException e) {
      throw new RuntimeException("Error obtaining Schema", e);
    }

    JaxbContextResolver resolver = new JaxbContextResolver().withSchema(schema)
      .formatOutput(true);
  
    Set<Object> single = Sets.newHashSet();
    single.add(resolver);
    singletons = Collections.unmodifiableSet(single);
  }
  
  public Set&lt;Object> getSingletons() {
    return singletons;
  }
}
With the above, Unmarshaller's are provided that utilize the provided schema when unmarshalling received payload. When an error occurs during Unmarshalling, Jersey catches the javax.xml.bind.UnmarshallException, wraps it in a javax.ws.rs.WebApplicationException and throws the same. If one desires to customize the response sent back to the consumer the only option is to create an javax.ws.rs.ext.ExceptionMapper for javax.ws.rs.WebApplicationException and interrogate the cause to determine if it were a javax.xml.bind.UnmarshallException. I could not find a way to map the Unmarshall exception thrown by JAXB directly to an ExceptionMapper. If anyone has done so, I would love to hear about their solution.
class WebApplicationExceptionMapper imlements ExceptionMapper<WebApplicationException> {
   public Response toResponse(WebApplicationException e) {
      if (e.getCause() instanceof UnmarshallException) {
         return Response.status(404).entity("Tsk Tsk, XML is horrid and you provide the worst possible one?").build();
      }
      else {
         return Response.....
      }
  }
}
Sometimes, one does not need the full fledged validation benefits of a schema and can make do with a ValidationEventHandler. In such a case, one can provide the JaxbContextResolver with an instance of a javax.xml.bind.ValidationEventHandler. The handler could then be configured to throw a custom exception which can be mapped to a custom response using an ExceptionMapper as shown below. This approach is what appears on the Jersey mailing list entry:
JaxbContextResolver resolver = new JaxbContextResolver().withValidationEventHandler(new ValidationEventHandler() {
     public boolean handleEvent(ValidationEvent event) {
       if (event.getSeverity() == ValidationEvent.WARNING) {
          // Log and return
          return true;
      }
      throw new CustomRuntimeException(event);
   });

@Provider
class CustomRuntimeExceptionMapper implements ExceptionMapper<CustomRuntimeException> {
   public Response toResponse(CustomRuntimeException e) {
      return Response.status(400).entity("Oooops:" + e).build();
   }
}
Note that throwing a custom Exception and catching the same via an ExceptionMapper will work only if one does NOT provide a Schema. Once a schema is in place, the exception will be caught by JAXB and swallowed and one has to catch WebApplicationException and provide a custom response as described earlier.

An example provided herewith demonstrates a simple Notes service that manages the life cycle of a Note. It employs Guice to inject dependencies into Resource and Sub-Resource classes. The application also demonstrates the JaxbContextResolver and the registration of a schema for validating a received Note payload. Quite sweet actually. The details of integrating Guice and Jersey is not being detailed in this BLOG as there is already a pretty thorough BLOG by Iqbal Yusuf that describes the same.

Download the maven example by clicking HERE, extract the archive and simply execute "mvn install" at the root level to see it a client-server interaction. If any passer by is a JAXB Guru or has any tips on the Resolver, I would love to hear. Enjoy!

Friday, September 9, 2011

Documenting Web Services with Enunciate


Understanding the workings of a Web Service from the perspective of a consumer, be it REST or SOAP based is not the easiest. Wading through a WADL or a WSDL is not a fun task, at least not for me.  Been looking into an open source project that has been around for some time called Enunciate and am pretty impressed by its capabilities.

Enunciate provides automated documentation of your resources and end points. It also has the ability to generate client artifacts for consuming the JAX-RS services in many different programming languages.

The process is itself quite simple. After creating your web service you have enunciate run as part of your build process and it generates nice HTML based documentation of your service along with client code (Yeah!) in different languages to consume the service. Using annotation based web services facilitates this nice magic I guess.

To experience enunciate for myself, I created a simple web service that exposes SOAP and REST based calls of the same underlying API.

A few simple criteria that I had for my sample application:
  • Documentation should be available at the Root context
  • Resources and Services must also be available at the Root context
To my pom, I added the necessary enunciate dependencies as shown below:
                                       
<dependencies>
  .... 
   <dependency>
        <groupId>org.codehaus.enunciate</groupId>
        <artifactId>enunciate-rt</artifactId>
	<version>1.24</version>
   </dependency>
 </dependencies>

<build>
  <plugins>
    ...
    <plugin>
          <groupId>org.codehaus.enunciate</groupId>
        <artifactId>maven-enunciate-plugin</artifactId>
        <version>1.24</version>
        <configuration>
          <configFile>enunciate.xml</configFile>
        </configuration>
        <executions>
           <execution>
              <goals>
                 <goal>assemble</goal>
              </goals>
          </execution>
        </execution>
    </plugin>
 </plugins>
To customize where my service's resources will be available, i.e., at the ROOT context I created an enunciate.xml file:
<enunciate xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:noNamespaceSchemaLocation="http://enunciate.codehaus.org/schemas/enunciate-1.21.xsd">
  
	<services>
		<<est defaultRestSubcontext="/"/>
		<soap defaultSoapSubcontext="/"/>
	</services>

	<modules>
		<spring-app disabled="true" />
		<docs docsDir="/" splashPackage="com.welflex.rest" title="Greetings Web Service API"
			copyright="Welflex"/>
	</modules>
</enunciate>
After building and running the web service, a nicely generated introduction page that looks like:


Navigating the application, one can view the Greeting Resource and Greeting Web Service end point documentation:




















Generated libraries for consuming the service are available for download as well along with documentation on how to use them, pretty sweet huh?


A maven example demonstrating the above is available for DOWNLOAD HERE

Once you download the example, execute:
>mvn install
>mvn jetty:run-exploded

Access the service at http://localhost:8080

Note that executing mvn jetty:run would give me a 404's. My only option was the  run-exploded. Enuciate creates an altered web.xml with its own custom classes. I am also curious to know whether using enunciate impacts the performance of the web service, and if so, by how much. Enunciate also facilitates skinning the documentation, I have not done the same but from their documentation it looks pretty straight forward.

I found that if one has a custom Servlet Filter and/or Listener defined in the project's web.xml then to get the same included into the final web.xml generated by enunciate, one would need to set the following in enunciate.xml.
<webapp mergeWebXML="src/main/webapp/WEB-INF/web.xml" />
Upon merging the same, my application started throwing 404's when attempting to access the documentation. In other words, using a custom servlet filter or listener with my use case of resources mounted at root context does not seem to work with my application and needs further investigation.Without being able to set Custom Filters and Listeners is a major blocker.

Another tool that I have included in the pom is JAX-Doclets which generate nice javadoc for the JAX-RS portion of the service. To view the same execute mvn site and navigate to the generated documentation.

Friday, February 25, 2011

Hadoop Pig and Java Map Reduce - A maven example

Recently I have been involved with the Hadoop family and as always would like to share :-) I am hoping to provide an individual interested in evaluating Map-Reduce and Apache Pig a starter project for the same.

At the core of Hadoop lies HDFS and the ability to perform Map-Reduce operations on data. Leaning on my previous example of Cassandra Map Reduce, this BLOG will help demonstrate how Map-Reduce using Hadoop can be achieved using simple plain ole Java or its exotic cousin Apache Pig.

Before getting started with the example, you will need to get Hadoop running in a pseudo distributed mode at the very least.  As a user of Ubuntu, I found the BLOG by Rahul Patodi to be a great start to installing Cloudera's Hadoop version. Alternatively, you can do the same by following the instructions on the Cloudera WIKI.

The example used in this BLOG uses a file that contains "comments" that are written to a file in HDFS in JSON format and subsequently demonstrates how Map Reduce jobs can be executed either in Java or in Pig. The jobs themselves check for certain "Key words" of interest within the comments posted, think Web Bot here :-). An example comments file could look like:
{"commenterId":"donaldduck","comment":"The world is a cave. James bond lives in a Cave.","country":"TANZANIA"}
{"commenterId":"factorypilot","comment":"Only a cave man could do this","country":"JAPAN"}
{"commenterId":"nemesis","comment":"Felix Lighter and James Bond work well together as they are cave men","country":"BRAZIL"}
{"commenterId":"jamesbond","comment":"James Bond would be dead without Q to help him.","country":"GERMANY"}

Java Map-Reduce
For the Java version, one would write a Mapper that would extract the comment key, check the same for occurences of the word of interest and increment the same while a Reducer in turn totals the results as show below:

public class CommentWordMapReduce {
  /**
   * Mapper
   */
  public static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);

    private Text word = new Text();

    private static final String COMMENT_KEY = "comment";
    private static final Set<<String> WORDS_TO_GET_COUNT_OF = new HashSet<String>(Arrays
        .asList(new String[] { "james", "2012", "cave", "walther", "bond" }));

    private final JSONParser parser = new JSONParser();
    
    
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException,
      InterruptedException {
      JSONObject jsonObj;
      try {
        jsonObj = (JSONObject) parser.parse(value.toString());
      }
      catch (ParseException e) {
        // Hmm unable to Parse the JSON, off to next record, better log though :-)
        e.printStackTrace();
        return;
      }
      String comment = jsonObj.get(COMMENT_KEY).toString();
      StringTokenizer tokenizer = new StringTokenizer(comment);

      while (tokenizer.hasMoreTokens()) {
        String token = tokenizer.nextToken().toLowerCase();
        if (WORDS_TO_GET_COUNT_OF.contains(token)) {
          word.set(token);
          context.write(word, ONE);
        }
      }
    }
  }

  /**
   * Reducer
   */
  public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
      InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }

  /**
   * @param inputPath The input file location from HDFS
   * @param outputPath Where to store results of the Map-Reduce
   */
  public boolean mapred(String inputPath, String outputPath) throws IOException,
    InterruptedException,
    ClassNotFoundException {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "process word count");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(WordMap.class);
    job.setReducerClass(WordReducer.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    
    job.setNumReduceTasks(1);
    
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
    job.waitForCompletion(true);

    return job.isSuccessful();
  }
}
A unit test provided will simply ensure that the expected count for each interested word infact match up with the actual as shown below:
public void javaMapReduce() throws Exception {
   assertTrue(new CommentWordMapReduce().mapred(INPUT_HDFS_FILE, OUTPUT_HDFS_DIR));
   validateResult();
}
If you wish to see the contents of the map-reduce job then execute:
>hadoop fs -cat /users/junittest/wcresults/part-r-00000
2012 1828
bond 4490
cave 2769
james 3631
walther 921
Pig Map-Reduce:

For the Pig Map Reduce equivalent of the example, there are two helper classes that I define, a custom LoadFunc (Loader Function) for the JSON file and a FilterFunc (filter) to only include the words of interest. The custom JSON loader is the courtesy of Kim Vogt from Git Hub and the Like Filter is a non-comprehensive version that I defined as follows:

public class LikeFilter extends FilterFunc {

  public Boolean exec(Tuple input) throws IOException {
    if (input == null || input.size() < 2) {
      // If no filter and input element are provided, filter provides false.
      return Boolean.FALSE;
    }
   
    List<Object> elems = input.getAll();
    
    // First element is the word presented, "for example foo or bar or bond"
    Object expected = input.getAll().get(0);
    
    // Subsequent elements are the filter conditions
    List<Object> comparators = elems.subList(1, elems.size());
    
    return comparators.contains(expected);
  }
}
Using the two classes, the Pig Script for the same map-reduce task looks like:
comments = LOAD '/users/junittest/comments' USING com.welflex.hadoop.pig.load.PigJsonLoader() AS (commentMap:map[]);
words = FOREACH comments GENERATE FLATTEN(TOKENIZE(LOWER(commentMap#'comment')));
filter_words = FILTER words BY com.welflex.hadoop.pig.func.LikeFilter($0, '2012', 'bond', 'cave', 'james', 'walther');
grouped = GROUP filter_words BY $0;
counts = FOREACH grouped GENERATE group,COUNT(filter_words);
store counts INTO '/users/junittest/wcresults';
The unit test for Pig simply registers related jars and executes the script as follows:
public void pigMapReduce() {
    // Get the jars required for the map reduce including custom functions
    Set<String> jars = getJarsForPig();

    // Set ExecType.MAPREDUCE if you want to run in a distributed mode
    PigServer pigServer = new PigServer(ExecType.MAPREDUCE);

    for (String jar : jars) {
      // Register the jars for Pig      
      pigServer.registerJar(jar);
    }
     //Execute the pig script       
     pigServer.registerScript(WordCountMapReduceTest.class
            .getResource("/wordcount.pig").toURI().toURL().getFile());
    // Post validation to make sure the results of the map-red are correct.
    validateResult();
}
The goal of above demonstration is to be able to write the pig script in a single location in your maven project and be able to run a unit-test of the same without having to re-write the script or handle registering custom jars. From the above example, one can witness that the Pig Script is far lesser complicated and lesser verbose when compared to the java version of the same and from an execution perspective quite performant as well. The example is organized as follows:
hadoop-example
|-- hadoop <-- Demonstrate the map - reduce of both Java and Pig versions
|   |-- pom.xml
|   `-- src
|       |-- main
|       |   |-- java
|       |   |   `-- com
|       |   |       `-- welflex
|       |   |           `-- hadoop
|       |   |               `-- hdfs
|       |   |                   |-- HdfsServiceImpl.java
|       |   |                   `-- HdfsService.java
|       |   |-- pig
|       |   |   `-- wordcount.pig
|       |   `-- resources
|       |       |-- core-site.xml
|       |       `-- log4j.properties
|       `-- test
|           |-- java
|           |   `-- com
|           |       `-- welflex
|           |           `-- hadoop
|           |               `-- mapred
|           |                   |-- CommentWordMapReduce.java
|           |                   `-- WordCountMapReduceTest.java
|           `-- resources
|               `-- comments
|-- pig-funcs <----- Contains the custom Pig artifacts
|   |-- pom.xml
|   `-- src
|       `-- main
|           `-- java
|               `-- com
|                   `-- welflex
|                       `-- hadoop
|                           `-- pig
|                               |-- func
|                               |   `-- LikeFilter.java
|                               `-- load
|                                   |-- JsonLineParser.java
|                                   `-- PigJsonLoader.java
`-- pom.xml
Simply execute:
>mvn test
to witness both the Java and Pig Map Reduce versions in actions or import into your favorite IDE and do the same. You can easily change the example to search for country count or augment the size of the file and try a non-local mode of map-reduce. I must state that when working with Hadoop and family, one needs to be careful with the versions they are working with. The above mentioned example works with Cloudera's Hadoop Version Hadoop 0.20.2-CDH3B4

Download the example here and happy Pigging. Oink out, or should I say Grunt out ;-)

Tuesday, January 4, 2011

Apache Cassandra Map Reduce - An Example

Happy New Year :-)! Starting this year with a small BLOG on using Map Reduce with Cassandra. Map Reduce on Cassandra is supported via Hadoop since version 0.6. Hadoop Map Reduce jobs can retrieve data from Cassandra and reduce the same. There is a word count example that is available via the Cassandra distribution. In this BLOG I will be using the Word Count example agains't the super column I had defined in my previous BLOG on Cassandra. With 0.7 of Cassandra there is support to reduce the output to Cassandra itself.

For the scope of the example, I have used the Comments Column Family from my previous BLOG and my goal is to find counts of certain words that I am interested in a time slice range. The example provided creates multiple comments on a single blog entry and then runs a Hadoop map reduce job that will output the results of words interested in into a column family that contains only the count of each word.
The Map Reduce job is provided a Slice Predicate providing a time range of data to search on.
ByteBuffer startKey = ...;
ByteBuffer endKey = ....;

SliceRange range = new SliceRange();
range.setStart(startKey);
range.setFinish(endKey);
range.setCount(Integer.MAX_VALUE);
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);

In the same vein, one could start different jobs across different time ranges to run simultaneously.

The Mapper is provided with the words we are interested and only increments the counts on the word during the map process.
    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException,
      InterruptedException {
      
      for (Map.Entry<ByteBuffer, IColumn> entry : columns.entrySet()) {
        IColumn column = entry.getValue();
        if (column == null) {
          continue;
        }
        
        IColumn textCol = column.getSubColumn(COMMENT_COL_NAME);
        String value = ByteBufferUtil.string(textCol.value());
     
        StringTokenizer itr = new StringTokenizer(value);
        while (itr.hasMoreTokens()) {
          String nextWord = itr.nextToken().toLowerCase();
          // Only trap expected words
          if (expectedWords.contains(nextWord)) {
            word.set(nextWord);
            context.write(word, one);
          }
        }
      }
    }
  }

The Reducer in turn reduces the same into a Cassandra family called Word Count similar to the Word count example provided by Cassandra.

If you run the MapReduceTest, as an output you can observe the following counts of the words I am interested in:
The word [james] has occured [1810] times
The word [2012] has occured [902] times
The word [cave] has occured [1368] times
The word [walther] has occured [481] times
The word [bond] has occured [2265] times
Note that I have made every word lower case in the example.
To run the example, download the same from HERE and run "mvn test". If I have not understood something correctly, please do let me know as although Map-Reduce is Old, my experience is minimal :-)