@Arslan6and6
2016-08-29T02:08:58.000000Z
字数 4147
阅读 602
第四章、深入Hadoop2.x
---MapReduce 分析完整基本指标
须在作业文档中体现:
1) 理解【网站基本指标】的几个概念
2) 分析需求,依据 MapReduce 编程模板编程 PV 程序
3) 自定义计算器,对 MapReduce 进行 DEBUG 测试
页面的浏览次数,衡量网站用户访问的页面数量;用户每打开一个页面就记录一次,多次打开同一页面则浏览量累计。
1天内访问某站点的人数(已cookie为依据);1天内同一访客的多次访问只计为1个访客。
记录所有访客1天内访问了多少次您的网站;当访客完成浏览并关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问行为。
指1天内使用不同IP地址的用户访问网站的数量;同一IP不管访问了几个页面,独立IP数均为1。
package org.apache.hadoop.studyhdfs;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author beifeng
*
*/
public class WebPvMapReduce extends Configured implements Tool {
// step 1 : mapper
/**
* public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class WordCountMapper extends //
Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable mapOutputKey = new IntWritable() ;
private final static IntWritable mapOutputValue = new IntWritable(1) ;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// line values
String lineValue = value.toString() ;
// split
String[] strs = lineValue.split("\t") ;
// provinceIdValue
String provinceIdValue = strs[23] ;
// url
String url = strs[1] ;
// line Value length
if( 30 > strs.length){
return ;
}
// url not null
if(StringUtils.isBlank(url)){
return ;
}
// provinceIdValue not null
if(StringUtils.isBlank(provinceIdValue)){
return ;
}
// provinceIdValue --> Int
int provinceId = Integer.MAX_VALUE ;
try {
provinceId = Integer.valueOf(provinceIdValue) ;
} catch (Exception e) {
return ;
}
if(Integer.MAX_VALUE == provinceId ){
return ;
}
// set map output key
mapOutputKey.set(provinceId);
// map output
context.write(mapOutputKey, mapOutputValue);
}
}
// step 2 : reducer
public static class WordCountReducer extends //
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable OutputValue = new IntWritable() ;
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// temp sum
int sum = 0 ;
for(IntWritable value : values){
sum += value.get() ;
}
// set output value
OutputValue.set(sum);
// set output
context.write(key, OutputValue);
}
}
// step 3 : job
public int run(String[] args) throws Exception {
// 1 : get configuration
Configuration configuration = super.getConf();
// 2 : create job
Job job = Job.getInstance(//
configuration,//
this.getClass().getSimpleName());
job.setJarByClass(WebPvMapReduce.class);
// 3 : set job
// input --> map --> reduce --> output
// 3.1 : input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 : mapper
job.setMapperClass(WordCountMapper.class);
// TODO
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//===========================
// compress
// combine
//job.setCombinerClass(WordCountReducer.class);
//===========================
// 3.3 : reducer
job.setReducerClass(WordCountReducer.class);
// TODO
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// 3.4 : output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 4 : submit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// 可以在代码中固定输入输出路径
// args = new String[] {
// "hdfs://hadoop-senior.ibeifeng.com:8020/input/2015082818" ,
// "hdfs://hadoop-senior.ibeifeng.com:8020/output100/"
// } ;
// get configuration
Configuration configuration = new Configuration();
//可以配置压缩优化
//configuration.set("mapreduce.map.output.compress","true") ;
// run job
int status = ToolRunner.run(//
configuration,//
new WebPvMapReduce(),//
args);
// exit program
System.exit(status);
}
}
将Java文件打包命名为webPvMr.jar
执行程序:
beifeng@hadoop-senior hadoop-2.5.0]$ bin/yarn jar /home/beifeng/jar/webPvMr.jar /input/2015082818 /output3