Avro MapReduce Word Count Example




Avro MapReduce Word Count Example

In this post, we will discuss about famous word count example through mapreduce and create a sample avro data file in hadoop distributed file system.

Prerequisite:

In order to execute the mapreduce word count program given in this post, we need avro-mapred-1.7.4-hadoop2.jar file to be present in $HADOOP_HOME/share/hadoop/common/lib directory. This jar contains the classes used for avro serialization and deserialization through mapreduce framework. For instructions on installation and integration of Avro with Hadoop2 refer the post Avro Installation. If the correct version of this jar file is not present in common/lib directory then we will end up in lot of errors/exceptions. So, we need to be very careful in choosing the version of this jar file into right directory of hadoop distribution.

Usually it is preferable to place this jar in $HADOOP_HOME/share/hadoop/common/lib directory as this location is included in hadoop classpath by default.

Avro MapReduce Word Count:

After setup of Avro in Hadoop cluster, we can run the below mapreduce program. Copy the below code snippet into MapReduceAvroWordCount.java file. It is a traditional mapreduce word count program only but it reads input file from text format and writes its output to an avro data file in Avro Pair<CharSequence, Integer> records instead of text.

import java.io.IOException;
import java.util.*;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceAvroWordCount extends Configured implements Tool {

  public static class Map
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class Reduce extends Reducer<Text, IntWritable,
            AvroWrapper<Pair<CharSequence, Integer>>, NullWritable> 
  {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable value : values) 
      {
        sum += value.get();
      }
      context.write(new AvroWrapper<Pair<CharSequence, Integer>>
                    (new Pair<CharSequence, Integer>(key.toString(), sum)),
                    NullWritable.get());
    }
  }

  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: AvroWordCount <input path> <output path>");
      return -1;
} Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf, "wordcount"); job.setJarByClass(MapReduceAvroWordCount.class); // We call setOutputSchema to set key format as <string , integer> pair. AvroJob.setOutputKeySchema(job, Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.INT))); job.setOutputValueClass(NullWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setSortComparatorClass(Text.Comparator.class); job.setNumReduceTasks(1); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); if(fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MapReduceAvroWordCount(), args); System.exit(res); } }

In the above code we are using AvroWrapper class to write pairs of <String, Integer> values and this pair is included in the reduce output key. Reducer’s output value is maintained as NullWritable as there is no need for it because, both string and its count are included in reducer’s output key part itself.

  • Compile this program by providing the classpath argument containing the path to avro-mapred-1.7.4-hadoop2.jar & avro-1.7.4.jar files. And build the jar file with generated classes.
$ javac -classpath $CLASSPATH -d avromr MapReduceAvroWordCount.java
$ jar -cvf avromr.jar -C avromr/ .

MR compilation

In the above screen shot classes are stored in avromr directory and the same is used to build avromr.jar file.

  • Create some sample input file and copy it to hdfs. Below are the contents of our input file.
hadoop1@ubuntu-1:~$ hadoop fs -cat /in/test.txt
This is a test file created for examining the Avro Mapreduce word count program. count file is fine.
  •  Run the mapreduce program with below command.
$ hadoop jar avromr.jar MapReduceAvroWordCount /in/test.txt /out/mr

avro mapreduce wc run

  • Verify the outputs of the above mapreduce job.
$ hadoop fs -ls -R /out/mr
$ hadoop fs -cat /out/mr/part-r-00000

avro mr job output

We can observe the key-value pairs in the avro output file and Key part is string and value part is integer.

So, we have successfully tested our classic mapreduce program via avro format in hadoop environment.

The post Avro MapReduce Word Count Example appeared first on Hadoop Online Tutorials.

 




One comment

  • Trinesh

    How to add avro-mapred-1.7.4-hadoop2.jar file to $HADOOP_HOME/share/hadoop/common/lib directory.