[关闭]
@Arslan6and6 2016-08-29T02:08:58.000000Z 字数 4147 阅读 602

【作业七】殷杰

第四章、深入Hadoop2.x

---MapReduce 分析完整基本指标

作业描述:

依据课堂讲解的 MapReduce 编程模板编写【分析网站基本指标 PV】程序。以下几点注意,必

须在作业文档中体现:

1) 理解【网站基本指标】的几个概念

2) 分析需求,依据 MapReduce 编程模板编程 PV 程序

3) 自定义计算器,对 MapReduce 进行 DEBUG 测试

1) 理解【网站基本指标】的几个概念

PV 即page view,浏览量。

页面的浏览次数,衡量网站用户访问的页面数量;用户每打开一个页面就记录一次,多次打开同一页面则浏览量累计。

UV 即Unique Visitor,独立访问客数

1天内访问某站点的人数(已cookie为依据);1天内同一访客的多次访问只计为1个访客。

VV 即Visitor View,访客的访问次数

记录所有访客1天内访问了多少次您的网站;当访客完成浏览并关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问行为。

IP 独立IP数

指1天内使用不同IP地址的用户访问网站的数量;同一IP不管访问了几个页面,独立IP数均为1。

2) 分析需求,依据MapReduce编程模板编程PV程序

  1. package org.apache.hadoop.studyhdfs;
  2. import java.io.IOException;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. /**
  18. *
  19. * @author beifeng
  20. *
  21. */
  22. public class WebPvMapReduce extends Configured implements Tool {
  23. // step 1 : mapper
  24. /**
  25. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  26. */
  27. public static class WordCountMapper extends //
  28. Mapper<LongWritable, Text, IntWritable, IntWritable> {
  29. private IntWritable mapOutputKey = new IntWritable() ;
  30. private final static IntWritable mapOutputValue = new IntWritable(1) ;
  31. @Override
  32. public void map(LongWritable key, Text value, Context context)
  33. throws IOException, InterruptedException {
  34. // line values
  35. String lineValue = value.toString() ;
  36. // split
  37. String[] strs = lineValue.split("\t") ;
  38. // provinceIdValue
  39. String provinceIdValue = strs[23] ;
  40. // url
  41. String url = strs[1] ;
  42. // line Value length
  43. if( 30 > strs.length){
  44. return ;
  45. }
  46. // url not null
  47. if(StringUtils.isBlank(url)){
  48. return ;
  49. }
  50. // provinceIdValue not null
  51. if(StringUtils.isBlank(provinceIdValue)){
  52. return ;
  53. }
  54. // provinceIdValue --> Int
  55. int provinceId = Integer.MAX_VALUE ;
  56. try {
  57. provinceId = Integer.valueOf(provinceIdValue) ;
  58. } catch (Exception e) {
  59. return ;
  60. }
  61. if(Integer.MAX_VALUE == provinceId ){
  62. return ;
  63. }
  64. // set map output key
  65. mapOutputKey.set(provinceId);
  66. // map output
  67. context.write(mapOutputKey, mapOutputValue);
  68. }
  69. }
  70. // step 2 : reducer
  71. public static class WordCountReducer extends //
  72. Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  73. private IntWritable OutputValue = new IntWritable() ;
  74. @Override
  75. public void reduce(IntWritable key, Iterable<IntWritable> values,
  76. Context context) throws IOException, InterruptedException {
  77. // temp sum
  78. int sum = 0 ;
  79. for(IntWritable value : values){
  80. sum += value.get() ;
  81. }
  82. // set output value
  83. OutputValue.set(sum);
  84. // set output
  85. context.write(key, OutputValue);
  86. }
  87. }
  88. // step 3 : job
  89. public int run(String[] args) throws Exception {
  90. // 1 : get configuration
  91. Configuration configuration = super.getConf();
  92. // 2 : create job
  93. Job job = Job.getInstance(//
  94. configuration,//
  95. this.getClass().getSimpleName());
  96. job.setJarByClass(WebPvMapReduce.class);
  97. // 3 : set job
  98. // input --> map --> reduce --> output
  99. // 3.1 : input
  100. Path inPath = new Path(args[0]);
  101. FileInputFormat.addInputPath(job, inPath);
  102. // 3.2 : mapper
  103. job.setMapperClass(WordCountMapper.class);
  104. // TODO
  105. job.setMapOutputKeyClass(IntWritable.class);
  106. job.setMapOutputValueClass(IntWritable.class);
  107. //===========================
  108. // compress
  109. // combine
  110. //job.setCombinerClass(WordCountReducer.class);
  111. //===========================
  112. // 3.3 : reducer
  113. job.setReducerClass(WordCountReducer.class);
  114. // TODO
  115. job.setOutputKeyClass(IntWritable.class);
  116. job.setOutputValueClass(IntWritable.class);
  117. // 3.4 : output
  118. Path outPath = new Path(args[1]);
  119. FileOutputFormat.setOutputPath(job, outPath);
  120. // 4 : submit job
  121. boolean isSuccess = job.waitForCompletion(true);
  122. return isSuccess ? 0 : 1;
  123. }
  124. public static void main(String[] args) throws Exception {
  125. // 可以在代码中固定输入输出路径
  126. // args = new String[] {
  127. // "hdfs://hadoop-senior.ibeifeng.com:8020/input/2015082818" ,
  128. // "hdfs://hadoop-senior.ibeifeng.com:8020/output100/"
  129. // } ;
  130. // get configuration
  131. Configuration configuration = new Configuration();
  132. //可以配置压缩优化
  133. //configuration.set("mapreduce.map.output.compress","true") ;
  134. // run job
  135. int status = ToolRunner.run(//
  136. configuration,//
  137. new WebPvMapReduce(),//
  138. args);
  139. // exit program
  140. System.exit(status);
  141. }
  142. }

将Java文件打包命名为webPvMr.jar
执行程序:
beifeng@hadoop-senior hadoop-2.5.0]$ bin/yarn jar /home/beifeng/jar/webPvMr.jar /input/2015082818 /output3

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注