Hadoop mapreduce Reduce Side Join With distributed Cache




HI,

today we will see the example on Hadoop mapreduce Reduce Side Join With distributed Cache

Now i am writing java code for reduce side join with 3 input files, which i place 1 file in distributed cache to join.

Step1:

In DeliverFileMapper.java file i am processing the UserDetails.txt file

Step2:

In DeliverFileMapper.java file i am processing the DeliveryDetails.txt file

Step3:

In SmsReducer.java file i am joining the two files that processesd from the above 2 mappers and i placed DeliveryStatusCodes.txt in distributed cache as this is small file.Now i process all the 3 files to join and will get the below desired results.

Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend

 

//Sample Inputs    

 File 1 – UserDetails.txt
123456, Jim
456123, Tom
789123, Harry
789456, Richa

    File 2 – DeliveryDetails.txt
123456, 001
456123, 002
789123, 003
789456, 004

 File 3 – DeliveryStatusCodes.txt
001, Delivered
002, Pending
003, Failed
004, Resend

Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend

UserFileMapper mapper is used  for processing UserDetails.txt file

[code]

package com.hadoop.reducejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class UserFileMapper extends Mapper  <LongWritable, Text, Text, Text>
{
//variables to process Consumer Details
private String cellNumber,customerName,fileTag=”CD”;

/* map method that process ConsumerDetails.txt and frames the initial key value pairs
* Key(Text) – mobile number
* Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
* */

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//taking one line/record at a time and parsing them into key value pairs
String line = value.toString();
String splitarray[] = line.split(“,”);
cellNumber = splitarray[0].trim();
customerName = splitarray[1].trim();

//sending the key value pair out of mapper
context.write(new Text(cellNumber), new Text(fileTag+customerName));
}
}

[/code]

 

 

DeliverFileMapper mapper is used to process DeliveryDetails.txt file

[code]

package com.hadoop.reducejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DeliverFileMapper extends Mapper<LongWritable, Text, Text, Text>
{
//variables to process delivery report
private String cellNumber,deliveryCode,fileTag=”DR~”;

/* map method that process DeliveryReport.txt and frames the initial key value pairs
*Key(Text) – mobile number
*Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code*/

public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{
//taking one line/record at a time and parsing them into key value pairs
String line = value.toString();
String splitarray[] = line.split(“,”);
cellNumber = splitarray[0].trim();
deliveryCode = splitarray[1].trim();

//sending the key value pair out of mapper
context.write(new Text(cellNumber), new Text(fileTag+deliveryCode));
}
}

[/code]

 

 

SmsReducer.java reducer is used to process 3 files where 1 file is in distributed cache

SmsReducer.java

[code]

package com.hadoop.reducejoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;




import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SmsReducer extends Reducer<Text, Text, Text, Text>
{

//Variables to aid the join process
private String customerName,deliveryReport;
private BufferedReader brReader;
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
/*Map to store Delivery Codes and Messages
*Key being the status code and vale being the status message*/
private static HashMap<String,String> DeliveryCodesMap= new HashMap<String,String>();

@Override
protected void setup(Context context) throws IOException,InterruptedException {

Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context.getConfiguration());

for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals(“DeliveryStatusCodes”)) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDeliveryStatusCodes(eachPath, context);
}
}

}

//To load the Delivery Codes and Messages into a hash map
private void loadDeliveryStatusCodes(Path filePath, Context context) throws IOException
{
String strLineRead = “”;
try {
brReader = new BufferedReader(new FileReader(filePath.toString()));

// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String splitarray[] = strLineRead.split(“,”);
DeliveryCodesMap.put(splitarray[0].trim(),    splitarray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}finally {
if (brReader != null) {
brReader.close();
}
}
}

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values)
{
String currValue = value.toString();
String valueSplitted[] = currValue.split(“~”);
/*identifying the record source that corresponds to a cell number
*  and parses the values accordingly*/
if(valueSplitted[0].equals(“CD”))
{
customerName=valueSplitted[1].trim();
}else if(valueSplitted[0].equals(“DR”))
{
//getting the delivery code and using the same to obtain the Message
deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
}
}
//pump final output to file
if(customerName!=null && deliveryReport!=null)
{
context.write(new Text(customerName), new Text(deliveryReport));
}
else if(customerName==null)
context.write(new Text(“customerName”), new Text(deliveryReport));
else if(deliveryReport==null)
context.write(new Text(customerName), new Text(“deliveryReport”));
}

}

[/code]

Driver program to run the job

SmsDriver.java

[code]

package com.hadoop.reducejoin;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SmsDriver extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

if (args.length != 2) {
System.out.printf(“Two parameters are required- <input dir> <output dir>n”);
return -1;
}

Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName(“Map-side join with text lookup file in DCache”);
DistributedCache.addCacheFile(new URI(“/user/hadoop/joinProject/data/DeliveryStatusCodes”),conf);

job.setJarByClass(SmsDriver.class);

//specifying the custom reducer class
job.setReducerClass(SmsReducer.class);

//Specifying the input directories(@ runtime) and Mappers independently for inputs from multiple sources
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, DeliverFileMapper.class);

//Specifying the output directory @ runtime
FileOutputFormat.setOutputPath(job, new Path(args[2]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),new SmsDriver(), args);
System.exit(exitCode);
}
}

[/code]




3 comments

  • Sai

    Hi, When I am trying to run a mapreduce job in Hue..I am getting the following error- Main class [com.demo.SmsDriver], exit code [-1] .What am I doing wrong?

  • sridhar

    This code have some issues and wont work as it is
    1)In SmsDriver add the following
    job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
    job.setOutputValueClass(org.apache.hadoop.io.Text.class);
    If ignored, it is default assumed to be LongWritable
    2)UsefileMapper
    private String cellNumber, customerName, fileTag = “CD~”;

  • Akchhaya

    Not getting the delivery status in the output. Getting output as :
    Jim deliveryReport
    Tom deliveryReport
    Harry deliveryReport
    Richa deliveryReport

    It seems distributed cache is not loading the file properly, please advise.