[关闭]
@Arslan6and6 2016-05-13T05:50:26.000000Z 字数 5432 阅读 651

【作业十一】殷杰

第五章、高级Hadoop2.x

---MapReduce 倒排索引

作业描述:

依据上课讲解【MapReduce执行流程】和【MapReduce高级案例:倒排索引】进行编码实现,

以下几点必须在作业中体现:

1)理解【倒排索引】功能

2)熟悉MapReduce中的Combiner功能

3)依据需求编码实现【倒排索引】功能,旨在对MapReduce的理解。

1)理解【倒排索引】功能

倒排索引是一种索引方法,根据文件内容确定文件名称、地址等信息

2)熟悉MapReduce中的Combiner功能

    MapReduce是分为Mapper任务和Reducer任务,Mapper任务的输出,通过网络传输到Reducer任务端,作为输入。
    在Reducer任务中,通常做的事情是对数据进行归约处理。既然数据来源是Mapper任务的输出,那么是否可以在Mapper端对数据进行归约处理,业务逻辑与Reducer端做的完全相同。处理后的数据再传送到Reducer端,再做一次归约。这样的好处是减少了网络传输的数量。
    在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间。
    Reducer端接收的数据就是来自于Mapper端。我们在Mapper进行归约处理,无非就是把归约操作提前到Mapper端做而已。
    Mapper端的数据仅仅是本节点处理的数据,而Reducer端处理的数据是来自于多个Mapper任务的输出。因此在Mapper不能归约的数据,在Reducer端有可能归约处理。
    在Mapper进行归约的类称为Combiner。是继承自Reducer类的自定义类。job.setCombinerClass(InvertedIndexCombiner.class); public static class InvertedIndexCombiner extends Reducer<T>{...} 。类名InvertedIndexCombiner为自定义。
    要注意的是,Combiner只在Mapper任务所在的节点运行,不会跨Mapper任务运行。Reduce端接收所有Mapper端的输出来作为输入。虽然两边的归约类是同一个,但是执行的位置完全不一样。
    并不是所有的归约工作都可以使用Combiner来做。比如求平均值就不能使用Combiner。因为对于平均数的归约算法不能多次调用。

3)依据需求编码实现【倒排索引】功能,旨在对MapReduce的理解

以案例说明
案例文件内容为
QQ截图20160513105402.jpg-10.9kB
需要统计出每个单词在各个URL出现的次数,如下图所示
01.jpg-22.4kB

  1. package org.apache.hadoop.index;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Mapper.Context;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. /**
  18. *
  19. * @author beifeng
  20. *
  21. */
  22. public class InvertedIndexMapReduce extends Configured implements Tool {
  23. // step 1 : mapper
  24. /**
  25. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  26. */
  27. public static class WordCountMapper extends //
  28. Mapper<LongWritable, Text, Text, Text> {
  29. private Text mapOutputKey = new Text();
  30. private Text mapOutputValue = new Text("1");
  31. @Override
  32. public void map(LongWritable key, Text value, Context context)
  33. throws IOException, InterruptedException {
  34. // split1 分割每行内容,分割后lines[0]为url,lines[1]为入 url后所有内容
  35. String[] lines = value.toString().split("##");
  36. // get url
  37. String url = lines[0];
  38. // split2 将"the apache hadoop"以空格分割
  39. String[] strs = lines[1].split(" ");
  40. //遍历[the, apache, hadoop]
  41. for (String str : strs) {
  42. mapOutputKey.set(str + "," + url);
  43. context.write(mapOutputKey, mapOutputValue);
  44. // mapOutputKey为
  45. // the,url-01 apache,url-01 hadoop,url-01
  46. // apache,url-02 framwor,url-02 hadoop,url-02
  47. // the,url-03 commmon,url-03 apache,url-03
  48. // apache,url-01 the,url-01 hadoop,url-01
  49. // apache,url-02 frmework,url-02
  50. // mapOutputValue为1
  51. }
  52. }
  53. }
  54. //
  55. // set combiner class
  56. public static class InvertedIndexCombiner extends //
  57. Reducer<Text, Text, Text, Text> {
  58. private Text CombinerOutputKey = new Text();
  59. private Text CombinerOutputValue = new Text();
  60. @Override
  61. public void reduce(Text key, Iterable<Text> values, Context context)
  62. throws IOException, InterruptedException {
  63. // split 以逗号分割"the,url-01" [the, url-01]
  64. String[] strs = key.toString().split(",");
  65. // set key
  66. CombinerOutputKey.set(strs[0] + "\n");
  67. // set value mapOutputValue输入的value为1
  68. //将该行the为key的value加总
  69. int sum = 0;
  70. for (Text value : values) {
  71. sum += Integer.valueOf(value.toString());
  72. }
  73. //Combiner输出的key为the value设置为url-01:1
  74. CombinerOutputValue.set(strs[1] + ":" + sum);
  75. context.write(CombinerOutputKey, CombinerOutputValue);
  76. // combiner合并后输出
  77. // <the, url-01:2> <the, url-03:1>
  78. // <apache, url-01:1> <apache, url-02:2>
  79. // <hadoop, url-01:2> <hadoop, url-02:1>
  80. // ...
  81. }
  82. }
  83. // step 2 : reducer 由于combiner已经计算好各单词即key在各url出现的次数
  84. // Reducer只需按照key把对应的value拼接即完成第二次合并。
  85. public static class WordCountReducer extends //
  86. Reducer<Text, Text, Text, Text> {
  87. private Text outputValue = new Text();
  88. @Override
  89. public void reduce(Text key, Iterable<Text> values, Context context)
  90. throws IOException, InterruptedException {
  91. // TODO
  92. String result = new String();
  93. for (Text value : values) {
  94. result += value.toString() + "\t";
  95. }
  96. outputValue.set(result);
  97. context.write(key, outputValue);
  98. }
  99. }
  100. // step 3 : job
  101. public int run(String[] args) throws Exception {
  102. // 1 : get configuration
  103. Configuration configuration = super.getConf();
  104. // 2 : create job
  105. Job job = Job.getInstance(//
  106. configuration,//
  107. this.getClass().getSimpleName());
  108. job.setJarByClass(InvertedIndexMapReduce.class);
  109. // job.setNumReduceTasks(tasks);
  110. // 3 : set job
  111. // input --> map --> reduce --> output
  112. // 3.1 : input
  113. Path inPath = new Path(args[0]);
  114. FileInputFormat.addInputPath(job, inPath);
  115. // 3.2 : mapper
  116. job.setMapperClass(WordCountMapper.class);
  117. // TODO
  118. job.setMapOutputKeyClass(Text.class);
  119. job.setMapOutputValueClass(Text.class);
  120. // ====================shuffle==========================
  121. // 1: partition
  122. // job.setPartitionerClass(cls);
  123. // 2: sort
  124. // job.setSortComparatorClass(cls);
  125. // 3: combine
  126. job.setCombinerClass(InvertedIndexCombiner.class);
  127. // 4: compress
  128. // set by configuration
  129. // 5 : group
  130. // job.setGroupingComparatorClass(cls);
  131. // ====================shuffle==========================
  132. // 3.3 : reducer
  133. job.setReducerClass(WordCountReducer.class);
  134. // TODO
  135. job.setOutputKeyClass(Text.class);
  136. job.setOutputValueClass(Text.class);
  137. // 3.4 : output
  138. Path outPath = new Path(args[1]);
  139. FileOutputFormat.setOutputPath(job, outPath);
  140. // 4 : submit job
  141. boolean isSuccess = job.waitForCompletion(true);
  142. return isSuccess ? 0 : 1;
  143. }
  144. public static void main(String[] args) throws Exception {
  145. args = new String[] {
  146. "hdfs://hadoop-senior.ibeifeng.com:8020/input/index.txt",
  147. "hdfs://hadoop-senior.ibeifeng.com:8020/output6/"
  148. };
  149. // get configuration
  150. Configuration configuration = new Configuration();
  151. // configuration.set(name, value);
  152. // run job
  153. int status = ToolRunner.run(//
  154. configuration,//
  155. new InvertedIndexMapReduce(),//
  156. args);
  157. // exit program
  158. System.exit(status);
  159. }
  160. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注