This post is about performing Sentiment Analysis on Twitter data using Map Reduce. We will use the concept of distributed cache to implement Sentiment Analysis on Twitter data.
What does distributed cache do here?
By using distributed cache, we can perform map side joins. So, here we will join the dictionary dataset containing the sentiment values of each word. In order to perform Sentiment Analysis, we will be using a dictionary called AFINN.
AFINN is a dictionary, which consists of 2500 words rated from +5 to -5, depending on their meaning.
Here is the complete code to perform Sentiment Analysis using Map Reduce.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; public class Sentiment_Analysis extends Configured implements Tool { public static class Map extends Mapper<LongWritable, Text, Text, Text>{ private URI[] files; private HashMap<String,String> AFINN_map = new HashMap<String,String>(); @Override public void setup(Context context) throws IOException { files = DistributedCache.getCacheFiles(context.getConfiguration()); System.out.println("files:"+ files); Path path = new Path(files[0]); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream in = fs.open(path); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line=""; while((line = br.readLine())!=null) { String splits[] = line.split("\t"); AFINN_map.put(splits[0], splits[1]); } br.close(); in.close(); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] tuple = line.split("\\n"); JSONParser jsonParser = new JSONParser(); try{ for(int i=0;i<tuple.length; i++){ JSONObject obj =(JSONObject) jsonParser.parse(tuple[i]); String tweet_id = (String) obj.get("id_str"); String tweet_text=(String) obj.get("text"); String[] splits = twt.toString().split(" "); int sentiment_sum=0; for(String word:splits){ if(AFINN_map.containsKey(word)) { Integer x=new Integer(AFINN_map.get(word)); sentiment_sum+=x; } } context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum)))); } }catch(Exception e){ e.printStackTrace(); } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key, Text value, Context context) throws IOException, InterruptedException{ context.write(key,value); } } public static void main(String[] args) throws Exception { ToolRunner.run(new Parse(),args); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("Usage: Parse <in> <out>"); System.exit(2); } DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf); Job job = new Job(conf, "SentimentAnalysis"); job.setJarByClass(Sentiment_Analysis .class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } } |
How did we achieve this?
Here are the three simple steps we have followed to perform Sentiment Analysis:
1. Implementing Distributed Caching
2. Writing a mapper class to calculate the sentiments
3. Writing a reducer class to display all the mapper output
4. Writing a Driver class for our mapreduce program
Implementing Distributed Caching
In Map Reduce, map-side joins are carried out by distributed cache. Distributed cache is applied when we have two datasets, where the smaller dataset size is limited to the cache memory of the cluster. Here, the dictionary is the smaller dataset, so we are using distributed cache. Here is the implementation of the distributed cache.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
private HashMap<String,String> AFINN_map = new HashMap<String,String>(); @Override public void setup(Context context) throws IOException { files = DistributedCache.getCacheFiles(context.getConfiguration()); System.out.println("files:"+ files); Path path = new Path(files[0]); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream in = fs.open(path); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line=""; while((line = br.readLine())!=null) { String splits[] = line.split("\t"); AFINN_map.put(splits[0], splits[1]); } br.close(); in.close(); } |
Initially, we have declared a HashMap by name AFINN_map to store the key and value i.e., the dictionary and the rating. You can download the dataset from here.
The data is tab separated, containing the word and its rating. So, we have read the file using the FSDataInputStream, read the file line by line using the readLine() method, we have split the line using the tab delimiter, and we have stored the word and its rating in the HashMap, which we have created with name AFINN_map.
1 |
files = DistributedCache.getCacheFiles(context.getConfiguration()); |
The above line reads the cached file in the cluster. So, with that, our cached file was read and processed successfully and the distributed cache implementation is completed.
2. Writing a Map Method to Calculate the Sentiments
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString();. JSONParser jsonParser = new JSONParser(); try{ JSONObject obj =(JSONObject) jsonParser.parse(line); String tweet_id = (String) obj.get("id_str"); String tweet_text=(String) obj.get("text"); String[] splits = twt.toString().split(" "); int sentiment_sum=0; for(String word:splits){ if(AFINN_map.containsKey(word)) { Integer x=new Integer(AFINN_map.get(word)); sentiment_sum+=x; } } context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum)))); }catch(Exception e){ e.printStackTrace(); } |
Now, the map method takes each record as input and the record is converted into a string, using the toString method. After this, we have created a jsonobject called jsonparser, which parses each record which is in JSON format.
Now we are extracting the tweet_id and the tweet_text which are required for sentiment analysis as shown below
1 2 3 4 5 |
JSONObject obj =(JSONObject) jsonParser.parse(line); String tweet_id = (String) obj.get("id_str"); String tweet_text=(String) obj.get("text"); |
Now, we are splitting the tweet_text into words, taking a for loop which repeats for all the words in the tweet_text, and we are performing an inner join to find the matches from the words in the AFINN dictionary if there is a match then take the rating of the word and add it. The same is as shown below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
String[] splits = twt.toString().split(" "); int sentiment_sum=0; for(String word:splits){ if(AFINN_map.containsKey(word)) { Integer x=new Integer(AFINN_map.get(word)); sentiment_sum+=x; } } context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum)))); |
Finally, in the context, we are writing the tweet_id as key and the combination of tweeted_text and the sentiment_sum as value.
3. Writing a Reducer Class to Display all the Mappers Output
1 2 3 4 5 6 7 8 9 |
public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key, Text value, Context context) throws IOException, InterruptedException{ context.write(key,value); } } |
In the reducer class, we are just passing the input of the mapper as its output.
4. Driver class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
public static void main(String[] args) throws Exception { ToolRunner.run(new Parse(),args); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("Usage: Parse <in> <out>"); System.exit(2); } DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf); Job job = new Job(conf, "SentimentAnalysis"); job.setJarByClass(Sentiment_Analysis .class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } |
In the Driver class, we need to provide the path for the cached dataset, using the below line.
1 |
DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf); |
On top of this, we need to provide the input(tweets_folder) path and the output folder path as arguments.
How to run our mapreduce program?
In order to run this program, we need to build a jar file of the above project, using the normal Hadoop command to run the program. You can refer to the below screenshot for this.
hadoop jar twitter.jar /user/flume/tweets /user/flume/tweets/output
The output will be created in the part file in the directory /user/flume/tweets/part-r-00000 and the result is displayed as shown in the below screenshot.
We hope this post has been helpful in understanding how to perform Sentiment Analysis using Map Reduce. In the case of any queries, feel free to comment below and we will get back to you at the earliest.
Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.
can you please share the input file
Hi Satyanarayan,
Please follow this blog to stream twitter data using flume on your own and you can use the same data as the input file.