@Arslan6and6
2016-08-29T02:08:58.000000Z
字数 4147
阅读 674
第四章、深入Hadoop2.x
---MapReduce 分析完整基本指标
须在作业文档中体现:
1) 理解【网站基本指标】的几个概念
2) 分析需求,依据 MapReduce 编程模板编程 PV 程序
3) 自定义计算器,对 MapReduce 进行 DEBUG 测试
页面的浏览次数,衡量网站用户访问的页面数量;用户每打开一个页面就记录一次,多次打开同一页面则浏览量累计。
1天内访问某站点的人数(已cookie为依据);1天内同一访客的多次访问只计为1个访客。
记录所有访客1天内访问了多少次您的网站;当访客完成浏览并关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问行为。
指1天内使用不同IP地址的用户访问网站的数量;同一IP不管访问了几个页面,独立IP数均为1。
package org.apache.hadoop.studyhdfs;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**** @author beifeng**/public class WebPvMapReduce extends Configured implements Tool {// step 1 : mapper/*** public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>*/public static class WordCountMapper extends //Mapper<LongWritable, Text, IntWritable, IntWritable> {private IntWritable mapOutputKey = new IntWritable() ;private final static IntWritable mapOutputValue = new IntWritable(1) ;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// line valuesString lineValue = value.toString() ;// splitString[] strs = lineValue.split("\t") ;// provinceIdValueString provinceIdValue = strs[23] ;// urlString url = strs[1] ;// line Value lengthif( 30 > strs.length){return ;}// url not nullif(StringUtils.isBlank(url)){return ;}// provinceIdValue not nullif(StringUtils.isBlank(provinceIdValue)){return ;}// provinceIdValue --> Intint provinceId = Integer.MAX_VALUE ;try {provinceId = Integer.valueOf(provinceIdValue) ;} catch (Exception e) {return ;}if(Integer.MAX_VALUE == provinceId ){return ;}// set map output keymapOutputKey.set(provinceId);// map outputcontext.write(mapOutputKey, mapOutputValue);}}// step 2 : reducerpublic static class WordCountReducer extends //Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private IntWritable OutputValue = new IntWritable() ;@Overridepublic void reduce(IntWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// temp sumint sum = 0 ;for(IntWritable value : values){sum += value.get() ;}// set output valueOutputValue.set(sum);// set outputcontext.write(key, OutputValue);}}// step 3 : jobpublic int run(String[] args) throws Exception {// 1 : get configurationConfiguration configuration = super.getConf();// 2 : create jobJob job = Job.getInstance(//configuration,//this.getClass().getSimpleName());job.setJarByClass(WebPvMapReduce.class);// 3 : set job// input --> map --> reduce --> output// 3.1 : inputPath inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);// 3.2 : mapperjob.setMapperClass(WordCountMapper.class);// TODOjob.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);//===========================// compress// combine//job.setCombinerClass(WordCountReducer.class);//===========================// 3.3 : reducerjob.setReducerClass(WordCountReducer.class);// TODOjob.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);// 3.4 : outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);// 4 : submit jobboolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1;}public static void main(String[] args) throws Exception {// 可以在代码中固定输入输出路径// args = new String[] {// "hdfs://hadoop-senior.ibeifeng.com:8020/input/2015082818" ,// "hdfs://hadoop-senior.ibeifeng.com:8020/output100/"// } ;// get configurationConfiguration configuration = new Configuration();//可以配置压缩优化//configuration.set("mapreduce.map.output.compress","true") ;// run jobint status = ToolRunner.run(//configuration,//new WebPvMapReduce(),//args);// exit programSystem.exit(status);}}
将Java文件打包命名为webPvMr.jar
执行程序:
beifeng@hadoop-senior hadoop-2.5.0]$ bin/yarn jar /home/beifeng/jar/webPvMr.jar /input/2015082818 /output3