Search This Blog

Tuesday, January 4, 2011

Apache Cassandra Map Reduce - An Example

Happy New Year :-)! Starting this year with a small BLOG on using Map Reduce with Cassandra. Map Reduce on Cassandra is supported via Hadoop since version 0.6. Hadoop Map Reduce jobs can retrieve data from Cassandra and reduce the same. There is a word count example that is available via the Cassandra distribution. In this BLOG I will be using the Word Count example agains't the super column I had defined in my previous BLOG on Cassandra. With 0.7 of Cassandra there is support to reduce the output to Cassandra itself.

For the scope of the example, I have used the Comments Column Family from my previous BLOG and my goal is to find counts of certain words that I am interested in a time slice range. The example provided creates multiple comments on a single blog entry and then runs a Hadoop map reduce job that will output the results of words interested in into a column family that contains only the count of each word.
The Map Reduce job is provided a Slice Predicate providing a time range of data to search on.
ByteBuffer startKey = ...;
ByteBuffer endKey = ....;

SliceRange range = new SliceRange();
range.setStart(startKey);
range.setFinish(endKey);
range.setCount(Integer.MAX_VALUE);
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);

In the same vein, one could start different jobs across different time ranges to run simultaneously.

The Mapper is provided with the words we are interested and only increments the counts on the word during the map process.
    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException,
      InterruptedException {
      
      for (Map.Entry<ByteBuffer, IColumn> entry : columns.entrySet()) {
        IColumn column = entry.getValue();
        if (column == null) {
          continue;
        }
        
        IColumn textCol = column.getSubColumn(COMMENT_COL_NAME);
        String value = ByteBufferUtil.string(textCol.value());
     
        StringTokenizer itr = new StringTokenizer(value);
        while (itr.hasMoreTokens()) {
          String nextWord = itr.nextToken().toLowerCase();
          // Only trap expected words
          if (expectedWords.contains(nextWord)) {
            word.set(nextWord);
            context.write(word, one);
          }
        }
      }
    }
  }

The Reducer in turn reduces the same into a Cassandra family called Word Count similar to the Word count example provided by Cassandra.

If you run the MapReduceTest, as an output you can observe the following counts of the words I am interested in:
The word [james] has occured [1810] times
The word [2012] has occured [902] times
The word [cave] has occured [1368] times
The word [walther] has occured [481] times
The word [bond] has occured [2265] times
Note that I have made every word lower case in the example.
To run the example, download the same from HERE and run "mvn test". If I have not understood something correctly, please do let me know as although Map-Reduce is Old, my experience is minimal :-)

4 comments:

Anonymous said...

Assuming you have a col family for URL and all the URLs on the web are in this table and you want to map reduce on a domain level, would you recommend creating a separate col family for domains and storing the map reduce values there?

Where do people normally store the values they receive from map reduce?

Anonymous said...

Hi setRange is only applicable to columns in a rangeslice, it limits the number of columns returned by the query.

Anonymous said...

I meant setStart and setFinish on a slicerange

Burak Emre said...

hey man, you saved my day. :)
btw, it seems sixtydigits repository doesn't work anymore. javanet and google maven repositories also have missing dependencies.