Hadoop Example, AccessLogCountByHourOfDay

Inspired by an article written by Tom White, AWS author and developer:
Running Hadoop MapReduce on Amazon EC2 and Amazon S3

Instead of minute of the week, this one does by Hour Of The Day. I just find this more interesting than the minute of the week that’s most popular. The output is:
00\t

23\t

The main reason for writing this, however, is to provide a working example that will compile. I found a number of problems in the original post.

The trickiest one is the need for org.joda.time for the DateTimeFormat, DateTimeFormatter and DateTime classes. You can get the packages for this at http://joda-time.sourceforge.net/.

java/AccessLogCountByHourOfDay.java:

package org.dkoopman;

import java.io.IOException;
import java.util.*;
import java.util.regex.*;
import org.joda.time.format.*;
import org.joda.time.DateTime;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class AccessLogCountByHourOfDay {

   public static class Map extends MapReduceBase 
                    implements Mapper<LongWritable, Text, IntWritable, LongWritable> {
     private final static LongWritable ONE = new LongWritable(1);
     private static Pattern p = Pattern
       .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
                " ([^ ]*) ([^ ]*).*");
     private static DateTimeFormatter formatter = DateTimeFormat
                                .forPattern("dd/MMM/yyyy:HH:mm:ss Z");

     private IntWritable hour = new IntWritable();
     public void map(LongWritable key, Text value, 
                    OutputCollector<IntWritable, LongWritable> output, Reporter reporter) 
                    throws IOException {
       String line = ((Text) value).toString();
       Matcher matcher = p.matcher(line);
       if (matcher.matches()) {
         String timestamp = matcher.group(4);
         hour.set(getHourBucket(timestamp));
         output.collect(hour, ONE);
       }
     } // end map

     private int getHourBucket(String timestamp) {
       DateTime dt = formatter.parseDateTime(timestamp);
      return dt.getHourOfDay();
    }
   } // end Map

  public static class Reduce extends MapReduceBase 
           implements Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
    
    public void reduce(IntWritable key, Iterator<LongWritable> values, 
             OutputCollector<IntWritable, LongWritable> output, Reporter reporter) 
             throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((LongWritable) values.next()).get();
      }
      output.collect(key, new LongWritable(sum));
    }
  }

  public static void main(String[] args) throws IOException {
    if (args.length != 2)
    {
      System.err.println("Usage: AccessLogCountByHourOfDay <input path> <output path>");
      System.exit(-1);
    }
    JobConf conf = new JobConf(AccessLogCountByHourOfDay.class);
    // This doesn't work:
    //conf.setInputPaths(new Path(args[0]));
    //conf.setOutputPaths(new Path(args[1]));
    // Use This instead:
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    conf.setOutputKeyClass(IntWritable.class);
    conf.setOutputValueClass(LongWritable.class);
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setNumReduceTasks(1);
    JobClient.runJob(conf);
  }

}

To compile, you need the joda time thing, get the tarball (see link above) and unpack.

mkdir -p classes/AccessLogCountByHourOfDay jar
javac -classpath /usr/share/hadoop/hadoop-0.19.1_01-core.jar:joda-time-1.6/joda-time-1.6.jar \
          -d classes/AccessLogCountByHourOfDay java/AccessLogCountByHourOfDay.java
jar -cvf jar/AccessLogCountByHourOfDay.jar -C classes/AccessLogCountByHourOfDay/ .

Lastly, to execute job:

hadoop jar -libjars joda-time-1.6/joda-time-1.6.jar \
   jar/AccessLogCountByHourOfDay.jar org.dkoopman.AccessLogCountByHourOfDay \
   httpd/logs output/httpd/logs/`date +"%Y-%m-%d_%H_%M_%S"`

This, of course, assumes you have httpd log files in your httpd/logs dir of your HDFS.

Comments, or questions?

DaveK.

Comments are closed.