hadoop mapreduce example with partitioner




Hello All,

In last class we seen the simple word-count Example.Now we will learn hadoop mapreduce example with partitioner and how to create multiple output files those have the same info i.e. nothing but the data is break down in groups and send them to different reducers and create separate outputs.

Input Format:
name<tab>age<tab>gender<tab>salary

We will use custom partitioning in MapReduce program to find the maximum salary in each gender and three age categories: less than 20, 20 to 50, greater than 50.

Input
Rajee<tab>23<tab>female<tab>5000
Rama<tab>34<tab>male<tab>7000
Arjun<tab>67<tab>male<tab>900000
Keerthi<tab>38<tab>female<tab>100000
Kishore<tab>25<tab>male<tab>23000
Daniel<tab>78<tab>male<tab>7600
James<tab>34<tab>male<tab>86000
Alex<tab>52<tab>male<tab>6900
Nancy<tab>7<tab>female<tab>9800
Adam<tab>9<tab>male<tab>3700
Jacob<tab>7<tab>male<tab>2390
Mary<tab>6<tab>female<tab>9300
Clara<tab>87<tab>female<tab>72000
Monica<tab>56<tab>female<tab>92000

Output:

Partition – 0: (this partition contains the maximum salaries for each gender whose age is less than 20)
Nancy<tab>age- 7<tab>female<tab>salary-9800
Adam<tab>age- 9<tab>male<tab>salary-3700

Partition – 1: (this partition contains the maximum salaries for each gender whose age is between 20 and 50)

Kristine<tab>age- 38<tab>female<tab>salary-100000
Bob<tab>age- 34<tab>male<tab>salary-86000

Partition – 2: (this partition contains the maximum salaries for each gender whose age is greater than 50)

Monica<tab>age- 56<tab>female<tab>salary-92000
Chris<tab>age- 67<tab>male<tab>salary-900000

PartitionerDriver.java

[code]

package com.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PartitionerDriver extends Configured implements Tool{

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, “partitioner”);

job.setJarByClass(getClass());

// configure output and input source
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(PartitionerMapper.class);
job.setPartitionerClass(AgePartitioner.class);
job.setReducerClass(PartitionerReducer.class);

// the number of reducers is set to 3, this can be altered according to
// the program’s requirements
job.setNumReduceTasks(3);

// configure output
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new PartitionerDriver(), args);
System.exit(exitCode);
}
}

[/code]

PartitionerMapper.Java

[code]

package com.hadoop;

import java.io.IOException;
//import java.util.StringTokenizer;

//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PartitionerMapper extends Mapper<LongWritable, Text, Text, Text> {




//private final static IntWritable countOne = new IntWritable(1);
//private final Text reusableText = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
//sample record
//name<tab>age<tab>gender<tab>salary
//Raju<tab>23<tab>male<tab>5000
//Rani<tab>21<tab>female<tab>50000
String[] tokens = value.toString().split(“t”);
String gender = tokens[2].toString();
String nameAgeSalary = tokens[0]+”t”+tokens[1]+”t”+tokens[3];

//the mapper emits key, value pair where the key is the gender and the value is the other information which includes name, age and score

context.write(new Text(gender), new Text(nameAgeSalary));
}
}

[/code]

AgePartitioner.Java

[code]

package com.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

//AgePartitioner is a custom Partitioner to partition the data according to age.
//The age is a part of the value from the input file.
//The data is partitioned based on the range of the age.
//In this example, there are 3 partitions, the first partition contains the information where the age is less than 20
//The second partition contains data with age ranging between 20 and 50 and the third partition contains data where the age is >50.

public class AgePartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {

String [] nameAgeSalary = value.toString().split(“t”);

String age = nameAgeSalary[1];

int ageInt = Integer.parseInt(age);
//this is done to avoid performing mod with 0
if(numReduceTasks == 0)
return 0;
//if the age is <20, assign partition 0
if(ageInt <=20){
return 0;
}
//else if the age is between 20 and 50, assign partition 1
if(ageInt >20 && ageInt <=50){
return 1 % numReduceTasks;
}
//otherwise assign partition 2
else
return 2 % numReduceTasks;

}

}

[/code]

PartitionerReducer.Java

[code]

package com.hadoop;

import java.io.IOException;

//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PartitionerReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

int maxSalary = Integer.MIN_VALUE;
String name = ” “;
String age = ” “;
String gender = ” “;
int salary = 0;

//iterating through the values corresponding to a particular key
for(Text val: values){
String [] valTokens = val.toString().split(“t”);
salary = Integer.parseInt(valTokens[2]);
//if the new salary is greater than the current maximum salary, update the fields as they will be the output of the reducer after all the values are processed for a particular key

if(salary > maxSalary){
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxSalary = salary;
}
}
context.write(new Text(name), new Text(“age- “+age+”t”+gender+”tscore-“+maxSalary));
}
}

[/code]

Hope this helps some one looking for Custom Partitioner Example.




3 comments

  • Senthil Kumar

    Its not clear ..i could not understand clearly

  • Anand Balaji Babu

    What happen if i Have the Number of partition to 2 and I removed the else part else
    return 2 % numReduceTasks;

    Does all records >50 will not be considered for the output

  • Asd

    Example is good, but the logic is wrong. The output is not as you claim it