Parallel Distributed Computing Example

You may have seen article, Hadoop Example, AccessLogCountByHourOfDay. This is a distributed computing solution, using Hadoop. The purpose of this article is to dive into the theory behind this.

To understand the power of distributed computing, we need to step back and understand the problem. First we’ll look at a command line java program that will process each http log file, one file at a time, one line at a time, until done. To speed up the job, we’ll then look at another approach: multi-threaded; we should be able to get the job done faster if we break the job up into a set of sub tasks and run them in parallel. Then, we’ll come to Hadoop, distributed computing. Same concept of breaking the job up into a set of sub tasks, but rather than running with one server, we’ll run on multiple servers in parallel.

At first you’d think that Hadoop would be the fastest, but in our basic example, you’ll see that Hadoop takes isn’t significantly faster. Why? The Hadoop overhead of scheduling the job and tracking the tasks is slowing us down. In order to see the power of Hadoop, we need much larger data sets. Think about our single server approach for a minute. As we ramp up the size and/or number of files to process, there is going to be a point where the server will hit resource limitations (cpu, ram, disk). If we have 4 threads making use of 4 cores of our CPU effectively, we may be able to do a job 4 times faster than single threaded. But, if we have a terabyte of data to process and it takes say 100 second per GB, it’s going to take 100,000 seconds to finish (that’s more than 1 day). With Hadoop, we can scale out horizontally. What if we had a 1000 node Hadoop cluster. Suddenly the overhead of scheduling the job and tracking the tasks is minuscule in comparison to the whole job. The whole job may complete in 100 seconds or less! We went from over a day to less than 2 minutes. Wow.

Please note: the single thread and multi-threaded examples in this article are not using the Map/Reduce algorithm. This is intentional. I’m trying to demonstrate the evolution of thought. When we think about how to solve the problem, the first thing that comes to mind is to walk through the files, one line at a time, and accumulate the result. Then, we realize we could split the job up into threads and gain some speed. The last evolution is is the Map/Reduce algorithm across a distributed computing platform.

Let’s dive in….

Single Thread, Local Implementation:

java/AccessLogByHourOfDay.java

package org.dkoopman;

import java.io.File;
import java.io.IOException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.util.regex.*;
import org.joda.time.format.*;
import org.joda.time.DateTime;

public class AccessLogByHourOfDay {

  private static Pattern p = Pattern.compile(
                 "([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
                 " ([^ ]*) ([^ ]*).*");
  private static DateTimeFormatter formatter = DateTimeFormat
                      .forPattern("dd/MMM/yyyy:HH:mm:ss Z");

  public static void main(String[] args) throws IOException {

    File inputDir = new File(args[0].toString());
    // Test input path is directory
    if ( !inputDir.isDirectory() ) {
      System.out.println("ERROR: " + inputDir.toString() + " is not a directory%n");
      System.exit(1);
    }

    // Initialize result array
    long[] result = new long[24];
    for (int i=0; i<24; i++)
      result&#91;i&#93; = 0;

    // Loop through files in dir (no recursion)
    String&#91;&#93; files = inputDir.list();
    for (int i=0; i<files.length; i++) {
      File inputFile = new File(inputDir.getPath() + "/" + files&#91;i&#93;.toString());
      if ( inputFile.isFile() )
      {
        // System.out.println("Processing: " + inputFile.getPath());
        BufferedReader inputStream = null;
        try {
          inputStream = new BufferedReader(new FileReader(inputFile.getPath()));
          String line;
          while ( (line = inputStream.readLine()) != null) {
            Matcher m = p.matcher(line);
            if ( m.matches()) {
              String timestamp = m.group(4);
              int hour = formatter.parseDateTime(timestamp).getHourOfDay();
              result&#91;hour&#93;++;
            }
          }
        } finally {
          if (inputStream != null ) inputStream.close();
        }
      }
    }

    for (int i=0; i<24; i++)
      System.out.println("" + i + "\t" + result&#91;i&#93;);
  }
}
&#91;/code&#93;

<strong>To compile, first you must have <a href="http://joda-time.sourceforge.net/">Joda Time</a> unpacked in your class dir:</strong>
<pre>
wget http://softlayer.dl.sourceforge.net/sourceforge/joda-time/joda-time-1.6.tar.gz
tar -xzf joda-time-1.6.tar.gz
mkdir class
cd class
jar -xvf ../joda-time-1.6/joda-time-1.6.jar
cd ..

To compile our program:

javac -classpath class/ -d class/ java/AccessLogByHourOfDay.java

And finally to run it:

$ cd class
$ time java -Xmx512m org/dkoopman/AccessLogByHourOfDay ../input
0       807743
1       918482
2       837200
3       829134
4       1095366
5       1105468
6       1107887
7       1047049
8       1012386
9       1088972
10      1087737
11      1124017
12      1084948
13      1060728
14      1070394
15      1082092
16      1098252
17      1079268
18      1081189
19      1078591
20      1083834
21      986306
22      837602
23      842438

real    5m16.764s
user    4m34.373s
sys     0m8.689s

In the previous run, in the ../input directory, I have 1 Apache log files totaling 6.7 GB. You want to replace ../input with the path to your httpd log files.

Multi-threaded Implementation:

java/SyncCounter.java

This is a helper class that uses the sychronized method type, which ensures only one thread can increment the counter at a time!

package org.dkoopman;

public class SyncCounter {
  private long[] result;
  public SyncCounter(int size) {
    result = new long[size];
    for (int i=0; i<result.length; i++)
      result&#91;i&#93; = 0;
  }
  synchronized void increment(int key) {
    result&#91;key&#93;++;
  }
  public long get(int key) {
    return result&#91;key&#93;;
  }
}
&#91;/code&#93;</pre>

<h3>java/AccessLogByHourOfDayMT.java</h3>
<pre>
package org.dkoopman;

import org.dkoopman.SyncCounter;
import java.lang.*;
import java.io.File;
import java.io.IOException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.util.regex.*;
import org.joda.time.format.*;
import org.joda.time.DateTime;


public class AccessLogByHourOfDayMT {

  private static Pattern p = Pattern.compile(
                 "([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
                 " ([^ ]*) ([^ ]*).*");
  private static DateTimeFormatter formatter = DateTimeFormat
                      .forPattern("dd/MMM/yyyy:HH:mm:ss Z");

  private static SyncCounter result;

  private static class ReadFileAndCount implements Runnable {
    private File inputFile;
    public void run() {
      BufferedReader inputStream = null;
      try {
        readLines();
      }
      catch ( IOException e ) {
        System.out.println("Caught Exception: " + e.toString() + ": " + e.getMessage());
      }
      //catch (InterruptedException e) {
      //  System.out.format("%s: I was not done with %s%n", Thread.currentThread().getName(), inputFile.toString());
      //  System.out.println("Caught Exception: " + e.toString() + ": " + e.getMessage());
      //}
    }
    private void readLines() throws IOException {
      BufferedReader inputStream = null;
      try {
        inputStream = new BufferedReader(new FileReader(inputFile.getPath()));
        String line;
        while ( (line = inputStream.readLine()) != null ) {
          Matcher m = p.matcher(line);
          if ( m.matches()) {
            String timestamp = m.group(4);
            try {
              int hour = formatter.parseDateTime(timestamp).getHourOfDay();
              result.increment(hour);
            } catch (Exception e) {
              System.out.println("Caught Exception: " + e.toString() + ": " + e.getMessage());
            }
          }
        }
      } finally {
        if (inputStream != null ) inputStream.close();
        System.out.println(inputFile.toString() + " is closed");
      }
    }
    public ReadFileAndCount(File f) {
      inputFile = f;
    }
  }



  public static void main(String[] args) {

    File inputDir = new File(args[0].toString());
    // Test input path is directory
    if ( !inputDir.isDirectory() ) {
      System.out.println("ERROR: " + inputDir.toString() + " is not a directory%n");
      System.exit(1);
    }

    // Initialize result array
    result = new SyncCounter(24);

    // Loop through files in dir (no recursion)
    String[] files = inputDir.list();
    Thread[] t = new Thread[files.length];
    for (int i=0; i<files.length; i++) {
      File inputFile = new File(inputDir.getPath() + "/" + files&#91;i&#93;.toString());
      if ( inputFile.isFile() )
      {
        //try {
          t&#91;i&#93; = new Thread(new ReadFileAndCount(inputFile));
          t&#91;i&#93;.start();
        //}
        //catch ( InterruptedException e ) {
        //  System.out.println("Caught Exception2: " + e.toString() + ": " + e.getMessage());
        //}
      }
    }

    boolean done = false;
    while (!done)
    {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        System.out.println("Caught InterruptedException1: " + e.toString() + ": " + e.getMessage());
      }
      done = true;
      for (int i=0; i<files.length; i++)
        done = !done ? false : !t&#91;i&#93;.isAlive();
    }

    for (int i=0; i<24; i++)
      System.out.println("" + i + "\t" + result.get(i));
  }
}
&#91;/code&#93;</pre>
Note: This program has some serious limitations.  Notice that it creates a separate thread for every file it processes without any concern for how many files there are.  Make sure you don't have too many files in your input directory, or this program will run out of heap space, or at the very least become inefficient.  Also, my understanding of how to effectively use exceptions in Java is limited.  I'm sure there are some things I should be doing to improve this.

<strong>To compile:</strong>
<pre>
javac -classpath class/ -d class/ java/SyncCounter.java
javac -classpath class/ -d class/ java/AccessLogByHourOfDayMT.java

To run:

$ cd class
$ time java -Xmx512m org/dkoopman/AccessLogByHourOfDayMT ../input
../input/access_log.1 is closed
0       807743
1       918482
2       837200
3       829134
4       1095366
5       1105468
6       1107887
7       1047049
8       1012386
9       1088972
10      1087737
11      1124017
12      1084948
13      1060728
14      1070394
15      1082092
16      1098252
17      1079268
18      1081189
19      1078591
20      1083834
21      986306
22      837602
23      842438

real    4m45.895s
user    9m14.291s
sys     0m10.579s

Note sure why I got a 30 second improvement running against a single file. The more interesting run is in the "Run All 3 Again, With Larger Data Set" section below, running against twelve 6.7GB files.

Hadoop, Distributed Computing, Implementation:

java/AccessLogByHourOfDayMapReduce.java

package org.dkoopman;

import java.io.IOException;
import java.util.*;
import java.util.regex.*;
import org.joda.time.format.*;
import org.joda.time.DateTime;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class AccessLogByHourOfDayMapReduce {

   public static class Map extends MapReduceBase
          implements Mapper<LongWritable, Text, IntWritable, LongWritable> {
     private final static LongWritable ONE = new LongWritable(1);
     private static Pattern p = Pattern
       .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
                " ([^ ]*) ([^ ]*).*");
     private static DateTimeFormatter formatter = DateTimeFormat
             .forPattern("dd/MMM/yyyy:HH:mm:ss Z");

     private IntWritable hour = new IntWritable();
     public void map(LongWritable key, Text value,
                 OutputCollector<IntWritable, LongWritable> output, Reporter reporter)
                 throws IOException {

       String line = ((Text) value).toString();
       Matcher matcher = p.matcher(line);
       if (matcher.matches()) {
         String timestamp = matcher.group(4);
         hour.set(getHourBucket(timestamp));
         output.collect(hour, ONE);
       }
     } // end map

     private int getHourBucket(String timestamp) {
       DateTime dt = formatter.parseDateTime(timestamp);
      return dt.getHourOfDay();
    }
   } // end Map

  public static class Reduce extends MapReduceBase
         implements Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {

    public void reduce(IntWritable key, Iterator<LongWritable> values,
           OutputCollector<IntWritable, LongWritable> output, Reporter reporter)
           throws IOException {
      long sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new LongWritable(sum));
    }
  }

  public static void main(String[] args) throws IOException {
    if (args.length != 2)
    {
      System.err.println("Usage: AccessLogByHourOfDayMapReduce <input path> <output path>");
      System.exit(-1);
    }
    JobConf conf = new JobConf(AccessLogByHourOfDayMapReduce.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    conf.setOutputKeyClass(IntWritable.class);
    conf.setOutputValueClass(LongWritable.class);
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setNumReduceTasks(1);
    JobClient.runJob(conf);
  }

}

To compile our program:

javac -classpath /usr/share/hadoop/hadoop-0.19.1_01-core.jar:class/ \
         -d class/ java/AccessLogByHourOfDayMapReduce.java

To make a jar:

mkdir -p jar
jar -cvf jar/AccessLogByHourOfDayMapReduce.jar -C class/ .

And finally to run it:

$ time hadoop jar jar/AccessLogByHourOfDayMapReduce.jar org.dkoopman.AccessLogByHourOfDayMapReduce \
  input/1 output/1/`date +"%Y-%m-%d_%H_%M_%S"`
09/04/25 11:12:54 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. 
Applications should implement Tool for the same.
09/04/25 11:12:54 INFO mapred.FileInputFormat: Total input paths to process : 1
09/04/25 11:12:55 INFO mapred.JobClient: Running job: job_200904081737_14677
09/04/25 11:12:56 INFO mapred.JobClient:  map 0% reduce 0%
09/04/25 11:13:09 INFO mapred.JobClient:  map 2% reduce 0%
[snip]
09/04/25 11:15:58 INFO mapred.JobClient:  map 100% reduce 100%
09/04/25 11:16:00 INFO mapred.JobClient: Job complete: job_200904081737_14677
09/04/25 11:16:00 INFO mapred.JobClient: Counters: 16
09/04/25 11:16:00 INFO mapred.JobClient:   File Systems
09/04/25 11:16:00 INFO mapred.JobClient:     HDFS bytes read=7193481323
09/04/25 11:16:00 INFO mapred.JobClient:     HDFS bytes written=247
09/04/25 11:16:00 INFO mapred.JobClient:     Local bytes read=36294
09/04/25 11:16:00 INFO mapred.JobClient:     Local bytes written=76254
09/04/25 11:16:00 INFO mapred.JobClient:   Job Counters
09/04/25 11:16:00 INFO mapred.JobClient:     Launched reduce tasks=1
09/04/25 11:16:00 INFO mapred.JobClient:     Launched map tasks=119
09/04/25 11:16:00 INFO mapred.JobClient:     Data-local map tasks=119
09/04/25 11:16:00 INFO mapred.JobClient:   Map-Reduce Framework
09/04/25 11:16:00 INFO mapred.JobClient:     Reduce input groups=24
09/04/25 11:16:00 INFO mapred.JobClient:     Combine output records=2592
09/04/25 11:16:00 INFO mapred.JobClient:     Map input records=24447083
09/04/25 11:16:00 INFO mapred.JobClient:     Reduce output records=24
09/04/25 11:16:00 INFO mapred.JobClient:     Map output bytes=293364996
09/04/25 11:16:00 INFO mapred.JobClient:     Map input bytes=7193042944
09/04/25 11:16:00 INFO mapred.JobClient:     Combine input records=24447083
09/04/25 11:16:00 INFO mapred.JobClient:     Map output records=24447083
09/04/25 11:16:00 INFO mapred.JobClient:     Reduce input records=2592

real    3m6.908s
user    0m1.590s
sys     0m0.201s

The Hadoop cluster this job is running on is a 4 data node system. 2xdual core, 16GB RAM. Pretty powerful, but commodity class servers. It was already faster than the single thread, old school way of doing it.

Note: In my previous trials, I had about thirty files that totaled only 150 MB. In this case, the single-threaded program took about 3 seconds, and Hadoop took about 45 seconds. This shows that Hadoop has some significant task scheduling overhead when dealing with a small amount of overhead. I've heard to get the most out of Hadoop, each mapper should take 1 minute or more to complete, to be worth the overhead of distributing the job. IMO, doesn't have to be that long, but you do need to make sure you're selecting Hadoop for large Map/Reduce jobs. Large being at a minimum many GB, but its going to really shine in the TB range.

Next up, let's run the whole thing all over again, this time with a much bigger data set.

Run All 3 Again, With Larger Data Set

In this run, I have 12 files, each is 6.7 GB, totaling 81 GB.
Even this isn't big enough to maximize the effect of Hadoop, but it should have better results. I'd expect the single-thread to take near exactly 12 times as long. The first one took 5m16.764s. So 63.2 minutes is my prediction. Let's see....

Single Thread, Local

$ time java -Xmx512m org/dkoopman/AccessLogByHourOfDay ../input
0       9692916
1       11021784
2       10046400
3       9949608
4       13144392
5       13265616
6       13294644
7       12564588
8       12148632
9       13067664
10      13052844
11      13488204
12      13019376
13      12728736
14      12844728
15      12985104
16      13179024
17      12951216
18      12974268
19      12943092
20      13006008
21      11835672
22      10051224
23      10109256

real    58m9.882s
user    54m40.025s
sys     2m0.463s

Multi-Threaded

$ time java -Xmx512m org/dkoopman/AccessLogByHourOfDayMT ../input
0       9692916
1       11021784
2       10046400
3       9949608
4       13144392
5       13265616
6       13294644
7       12564588
8       12148632
9       13067664
10      13052844
11      13488204
12      13019376
13      12728736
14      12844728
15      12985104
16      13179024
17      12951216
18      12974268
19      12943092
20      13006008
21      11835672
22      10051224
23      10109256

real    45m8.621s
user    46m56.195s
sys     1m53.119s

Hadoop

$ time hadoop jar jar/AccessLogByHourOfDayMapReduce.jar org.dkoopman.AccessLogByHourOfDayMapReduce \
    input/81 output/`date +"%Y-%m-%d_%H_%M_%S"`
09/04/24 19:17:43 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. 
Applications should implement Tool for the same.
09/04/24 19:17:43 INFO mapred.FileInputFormat: Total input paths to process : 12
09/04/24 19:17:44 INFO mapred.JobClient: Running job: job_200904081737_13460
09/04/24 19:17:45 INFO mapred.JobClient:  map 0% reduce 0%
09/04/24 19:18:10 INFO mapred.JobClient:  map 1% reduce 0%
[snip]
09/04/24 19:37:14 INFO mapred.JobClient:  map 100% reduce 100%
09/04/24 19:37:15 INFO mapred.JobClient: Job complete: job_200904081737_13460
09/04/24 19:37:15 INFO mapred.JobClient: Counters: 16
09/04/24 19:37:15 INFO mapred.JobClient:   File Systems
09/04/24 19:37:15 INFO mapred.JobClient:     HDFS bytes read=79128294553
09/04/24 19:37:15 INFO mapred.JobClient:     HDFS bytes written=273
09/04/24 19:37:15 INFO mapred.JobClient:     Local bytes read=63516
09/04/24 19:37:15 INFO mapred.JobClient:     Local bytes written=503110
09/04/24 19:37:15 INFO mapred.JobClient:   Job Counters
09/04/24 19:37:15 INFO mapred.JobClient:     Launched reduce tasks=1
09/04/24 19:37:15 INFO mapred.JobClient:     Launched map tasks=1198
09/04/24 19:37:15 INFO mapred.JobClient:     Data-local map tasks=1197
09/04/24 19:37:15 INFO mapred.JobClient:   Map-Reduce Framework
09/04/24 19:37:15 INFO mapred.JobClient:     Reduce input groups=24
09/04/24 19:37:15 INFO mapred.JobClient:     Combine output records=28536
09/04/24 19:37:15 INFO mapred.JobClient:     Map input records=268917913
09/04/24 19:37:15 INFO mapred.JobClient:     Reduce output records=24
09/04/24 19:37:15 INFO mapred.JobClient:     Map output bytes=3227014956
09/04/24 19:37:15 INFO mapred.JobClient:     Map input bytes=79123472384
09/04/24 19:37:15 INFO mapred.JobClient:     Combine input records=268941913
09/04/24 19:37:15 INFO mapred.JobClient:     Map output records=268917913
09/04/24 19:37:15 INFO mapred.JobClient:     Reduce input records=4536

real    19m38.629s
user    0m3.265s
sys     0m0.378s

Well, this was fun for me, getting a feel for Hadoop. Hope you enjoyed it.

Comments are closed.