In this post, we will be looking at ways to implement custom input format in Hadoop. For doing this, we have taken the Titanic data set as an example and have implemented the following problem statement.
Problem Statement:
Find out the number of people who died and survived, along with their genders.
Data Set Description:
The dataset description is as follows:
Column 1: PassengerId
Column 2: Survived (survived=0 & died=1)
Column 3: Pclass
Column 4: Name
Column 5: Sex
Column 6: Age
Column 7: SibSp
Column 8: Parch
Column 9: Ticket
Column 10: Fare
Column 11: Cabin
Column 12: Embarked
You can download the data set from the below link:
Here, we need to implement a custom key which is a combination of two columns i.e., 2nd column, which consists of the dead or the survivors and the 5th column, which contains the gender of the person. So, let’s prepare a custom key by combining both these columns and sort them using the gender column.
To begin with, we need to prepare our custom key. To prepare a custom, we need to implement the WritableComparable interface. Below is the source code, which contains the implementation of custom key.
To know more about writableComparable, click here.
Custom Key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.*; import com.google.common.collect.ComparisonChain; public class Key_value implements WritableComparable<Key_value> { private String x; private String y; public String getX() { return x; } public void setX(String x) { this.x = x; } public String getY() { return y; } public void setY(String y) { this.y = y; } public Key_value(String x, String y) { this.x = x; this.y = y; } public void write(DataOutput out) throws IOException { out.writeUTF(x); out.writeUTF(y); } public void readFields(DataInput in) throws IOException { x = in.readUTF(); y = in.readUTF(); } public Key_value(){ } @Override public int compareTo(Key_value o) { // TODO Auto-generated method stub return ComparisonChain.start().compare(this.y,o.y).compare(this.x,o.x).result(); } public boolean equals(Object o1) { if (!(o1 instanceof Key_value)) { return false; } Key_value other = (Key_value)o1; return this.x == other.x && this.y == other.y; } @Override public String toString() { return x.toString()+","+y.toString(); } } |
In the compareTo method, we have written our logic to sort the keys by the gender column. We have taken the ComparisionChain class and first compared the gender column and then compared the 1st column. Therefore, this logic will print the keys sorted by Gender column.
Note: If you compare only one column, then the second will be considered as a single value by the WritableComparable interface.
Now, we have written a custom key. Next, we need to write one inputFormat class which extends the default FileInputFormat.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class Titanic_input extends FileInputFormat<Key_value,IntWritable> { @Override public RecordReader<Key_value,IntWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { return new MyRecordReader(); } } |
Here, we are implementing the custom input format by extending the default FileInputFormat, which accepts the parameters key and value as our custom _key and the value as IntWritable.
Now, these values are passed to the Record reader, which does the actual formatting of the inputs. The custom RecordReader class is as follows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class MyRecordReader extends RecordReader<Key_value,IntWritable> { private Key_value key; private IntWritable value; private LineRecordReader reader = new LineRecordReader(); @Override public void close() throws IOException { // TODO Auto-generated method stub reader.close(); } @Override public Key_value getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public IntWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return reader.getProgress(); } @Override public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException { reader.initialize(is, tac); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub boolean gotNextKeyValue = reader.nextKeyValue(); if(gotNextKeyValue){ if(key==null){ key = new Key_value(); } if(value == null){ value = new IntWritable(); } Text line = reader.getCurrentValue(); String[] tokens = line.toString().split(","); key.setX(new String(tokens[1])); key.setY(new String(tokens[4])); value.set(new Integer(1)); } else { key = null; value = null; } return gotNextKeyValue; } } |
In the RecordReader, the nextKeyVlaue() is the method passed to our inputs. From the dataset, this RecordReader will take each line as input and sets the columns into our custom key as follows:
1 2 3 |
key.setX(new String(tokens[1])); key.setY(new String(tokens[4])); |
As discussed earlier, we need 2nd and 5th columns passed to our custom key.
The value is set as ‘1‘ since we need to count the number of people.
1 |
value.set(new Integer(1)); |
Now, the Mapper class is as follows:
1 2 3 4 5 6 7 8 9 10 11 |
public static class Map extends Mapper<Key_value, Text, Key_value, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(Key_value key, IntWritable value, Context context ) throws IOException, InterruptedException { context.write(key1, one); } } |
The Mapper class will just emit the keys and values, as it is sent by the RecordReader.
The Reducer class is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public static class Reduce extends Reducer<Key_value, IntWritable, Key_value, IntWritable> { public void reduce(Key_value key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } |
The Reducer will count all the values for each unique Reducer.
The Driver class is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job=new Job(); job.setJarByClass(Female_survived.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Key_value.class); job.setMapOutputKeyClass(Key_value.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(Titanic_input.class); job.setOutputFormatClass(TextOutputFormat.class); Path out=new Path(args[1]); out.getFileSystem(conf).delete(out); FileInputFormat.addInputPath(job,new Path( args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } |
We need to set the InputFormatClass as our CustomInput class. The OutputKey and OutputValue classes also needs to be set appropriately, otherwise it will throw an error.
The final output after running this program is as shown in the below screen shot.
The Keys have been sorted by the gender column. We have successfully implemented custom input format in Hadoop.
Hope this post has been helpful in understanding how to implement custom input format in Hadoop. In case of any queries, feel free to comment below and we will get back to you at the earliest.
Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.
Leave a Reply