@songlaf
2016-05-14T06:59:49.000000Z
字数 3820
阅读 719
北风网大数据培训
某一个关键字,可以是在Key中,也可以在Value中的,根据这个字来找到这个字的一些数据信息,可以是与之相关的其他的数据,也可以是出现的频率等。比如搜索引擎,输入一个关键字,找出这个关键字在哪个网站出现过,出现的频率等。
以一个实际的例子来说明倒排索在Map、Combine、Reduce过程的处理。
combine运行的map端
注意:
i)Map的输出是Combine的输入,Combin的输出是Reduce的输入,
ii)Combine不能改变map的输出。
例如下面的数据:
url-01##the apache hadoopurl-02##apache framework hadoopurl-03##the common apacheurl-01##apache the hadoopurl-02##apache framework
在Map端,关键字和URL合并作为一个Key,1来作为value输出,那么Map端输出的结果如下:
| Key | Value |
|---|---|
| the,url-01 | 1 |
| apache,url-01 | 1 |
| hadoop,url-01 | 1 |
| ...其他的省略 |
将key值相同得value值累加,得到一个单词在URL上得合计次数,然后把Key根据逗号分割,输出key=单词,value=合计次数,给reduce过程实用,于是combine输出的结果如下:
| Key | Value |
|---|---|
| apache | url-01:2 |
| apache | url-02:2 |
| apache | url-03:1 |
| ...其他的省略 |
Reduce就是把Combine过来的数据合并,生成我们所需要显示的格式。
package njt.song.study.hadoop;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class wordInverse extends Reducer<Text,Text,Text,Text> {private Text combineOutPueKey = new Text();private Text combineOutValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {String[] lines = key.toString().split(",");combineOutPueKey.set(lines[0]);int sum = 0;for (Text value:values) {sum += Integer.valueOf(value.toString());}combineOutValue.set(lines[1] + ":" + sum);context.write(combineOutPueKey, combineOutValue);}}
package njt.song.study.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class WordCount extends Configured implements Tool {public static class WordCountMapper extends Mapper<LongWritable,Text,Text,Text> {private Text outPutKey = new Text();private Text outPutValue = new Text("1");protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] lines = value.toString().split("##");if(lines.length != 2){return;}String url = lines[0];String[] words = lines[1].split(" ");for(String word:words){outPutKey.set(word + "," + url);context.write(outPutKey, outPutValue);}}}public static class WordCountReducer extends Reducer<Text,Text,Text,Text> {private Text outPutValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {String result = "";for(Text value:values){result += value.toString() + "\t";}outPutValue.set(result);context.write(key, outPutValue);}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();int status = ToolRunner.run(conf, new WordCount(),args);System.exit(status);}public int run(String[] args) throws Exception {Configuration configuration = super.getConf();Job job = Job.getInstance(configuration,this.getClass().getSimpleName());job.setJarByClass(WordCount.class);Path inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setCombinerClass(wordInverse.class);job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);boolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1;}}
#编译成Jar文件,以上述的例子文件为蓝本来处理bin/yarn jar /home/sjf/WordInverse.jar /input2 /Out20
执行结果:
bin/hdfs dfs -cat /Out20/part-r-00000
apache url-01:2 url-02:2 url-03:1common url-03:1framework url-02:2hadoop url-01:2 url-02:1the url-01:2 url-03:1
执行过程截图:
