相信接觸過搜索引擎開發的同學對倒排索引并不陌生,谷歌、百度等搜索引擎都是用的倒排索引,關于倒排索引的有關知識,這里就不再深入講解,有興趣的同學到網上了解一下。這篇博文就帶著大家一起學習下如何利用Hadoop的MR程序來實現倒排索引的功能。
一、數據準備
1、輸入文件數據
這里我們準備三個輸入文件,分別如下所示
a.txt
hello tom hello jerry hello tom
b.txt
hello jerry hello jerry tom jerry
c.txt
hello jerry hello tom
2、最終輸出文件數據
最終輸出文件的結果為:
[plain] view plain copyhello c.txt-->2 b.txt-->2 a.txt-->3 jerry c.txt-->1 b.txt-->3 a.txt-->1 tom c.txt-->1 b.txt-->1 a.txt-->2
二、倒排索引過程分析
根據輸入文件數據和最終的輸出文件結果可知,此程序需要利用兩個MR實現,具體流程可總結歸納如下:
-------------第一步Mapper的輸出結果格式如下:-------------------- context.wirte("hello->a.txt", "1") context.wirte("hello->a.txt", "1") context.wirte("hello->a.txt", "1") context.wirte("hello->b.txt", "1") context.wirte("hello->b.txt", "1") context.wirte("hello->c.txt", "1") context.wirte("hello->c.txt", "1") -------------第一步Reducer的得到的輸入數據格式如下:------------- <"hello->a.txt", {1,1,1}> <"hello->b.txt", {1,1}> <"hello->c.txt", {1,1}> -------------第一步Reducer的輸出數據格式如下--------------------- context.write("hello->a.txt", "3") context.write("hello->b.txt", "2") context.write("hello->c.txt", "2") -------------第二步Mapper得到的輸入數據格式如下:----------------- context.write("hello->a.txt", "3") context.write("hello->b.txt", "2") context.write("hello->c.txt", "2") -------------第二步Mapper輸出的數據格式如下:-------------------- context.write("hello", "a.txt->3") context.write("hello", "b.txt->2") context.write("hello", "c.txt->2") -------------第二步Reducer得到的輸入數據格式如下:----------------- <"hello", {"a.txt->3", "b.txt->2", "c.txt->2"}> -------------第二步Reducer輸出的數據格式如下:----------------- context.write("hello", "a.txt->3 b.txt->2 c.txt->2") 最終結果為: hello a.txt->3 b.txt->2 c.txt->2
三、程序開發
3.1、第一步MR程序與輸入輸出
package com.lyz.hdfs.mr.ii; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 倒排索引第一步Map Reduce程序,此處程序將所有的Map/Reduce/Runner程序放在一個類中 * @author liuyazhuang * */ public class InverseIndexStepOne { /** * 完成倒排索引第一步的mapper程序 * @author liuyazhuang * */ public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //獲取一行數據 String line = value.toString(); //切分出每個單詞 String[] fields = StringUtils.split(line, " "); //獲取數據的切片信息 FileSplit fileSplit = (FileSplit) context.getInputSplit(); //根據切片信息獲取文件名稱 String fileName = fileSplit.getPath().getName(); for(String field : fields){ context.write(new Text(field + "-->" + fileName), new LongWritable(1)); } } } /** * 完成倒排索引第一步的Reducer程序 * 最終輸出結果為: * hello-->a.txt 3 hello-->b.txt 2 hello-->c.txt 2 jerry-->a.txt 1 jerry-->b.txt 3 jerry-->c.txt 1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1 * @author liuyazhuang * */ public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long counter = 0; for(LongWritable value : values){ counter += value.get(); } context.write(key, new LongWritable(counter)); } } //運行第一步的MR程序 public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InverseIndexStepOne.class); job.setMapperClass(StepOneMapper.class); job.setReducerClass(StepOneReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path("D:/hadoop_data/ii")); FileOutputFormat.setOutputPath(job, new Path("D:/hadoop_data/ii/result")); job.waitForCompletion(true); } }
3.1.1 輸入數據
a.txt
hello tom hello jerry hello tom
b.txt
hello jerry hello jerry tom jerry
c.txt
hello jerry hello tom
3.1.2
輸出結果:
hello-->a.txt 3 hello-->b.txt 2 hello-->c.txt 2 jerry-->a.txt 1 jerry-->b.txt 3 jerry-->c.txt 1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1
3.2 第二步MR程序與輸入輸出
package com.lyz.hdfs.mr.ii; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 倒排索引第二步Map Reduce程序,此處程序將所有的Map/Reduce/Runner程序放在一個類中 * @author liuyazhuang * */ public class InverseIndexStepTwo { /** * 完成倒排索引第二步的mapper程序 * * 從第一步MR程序中得到的輸入信息為: * hello-->a.txt 3 hello-->b.txt 2 hello-->c.txt 2 jerry-->a.txt 1 jerry-->b.txt 3 jerry-->c.txt 1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1 * @author liuyazhuang * */ public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "/t"); String[] wordAndFileName = StringUtils.split(fields[0], "-->"); String word = wordAndFileName[0]; String fileName = wordAndFileName[1]; long counter = Long.parseLong(fields[1]); context.write(new Text(word), new Text(fileName + "-->" + counter)); } } /** * 完成倒排索引第二步的Reducer程序 * 得到的輸入信息格式為: * <"hello", {"a.txt->3", "b.txt->2", "c.txt->2"}>, * 最終輸出結果如下: * hello c.txt-->2 b.txt-->2 a.txt-->3 jerry c.txt-->1 b.txt-->3 a.txt-->1 tom c.txt-->1 b.txt-->1 a.txt-->2 * @author liuyazhuang * */ public static class StepTwoReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String result = ""; for(Text value : values){ result += value + " "; } context.write(key, new Text(result)); } } //運行第一步的MR程序 public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InverseIndexStepTwo.class); job.setMapperClass(StepTwoMapper.class); job.setReducerClass(StepTwoReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("D:/hadoop_data/ii/result/part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("D:/hadoop_data/ii/result/final")); job.waitForCompletion(true); } }
3.2.1 輸入數據
hello-->a.txt 3 hello-->b.txt 2 hello-->c.txt 2 jerry-->a.txt 1 jerry-->b.txt 3 jerry-->c.txt 1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1
3.2.2 輸出結果
hello c.txt-->2 b.txt-->2 a.txt-->3 jerry c.txt-->1 b.txt-->3 a.txt-->1 tom c.txt-->1 b.txt-->1 a.txt-->2
總結
以上就是本文關于Hadoop編程基于MR程序實現倒排索引示例的全部內容,希望對大家有所幫助。有什么問題可以直接留言,小編會及時回復大家的。感謝朋友們對本站的支持!
新聞熱點
疑難解答
圖片精選