@Arslan6and6
2016-05-13T05:50:26.000000Z
字数 5432
阅读 651
第五章、高级Hadoop2.x
---MapReduce 倒排索引
依据上课讲解【MapReduce执行流程】和【MapReduce高级案例:倒排索引】进行编码实现,
以下几点必须在作业中体现:
倒排索引是一种索引方法,根据文件内容确定文件名称、地址等信息
MapReduce是分为Mapper任务和Reducer任务,Mapper任务的输出,通过网络传输到Reducer任务端,作为输入。
在Reducer任务中,通常做的事情是对数据进行归约处理。既然数据来源是Mapper任务的输出,那么是否可以在Mapper端对数据进行归约处理,业务逻辑与Reducer端做的完全相同。处理后的数据再传送到Reducer端,再做一次归约。这样的好处是减少了网络传输的数量。
在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间。
Reducer端接收的数据就是来自于Mapper端。我们在Mapper进行归约处理,无非就是把归约操作提前到Mapper端做而已。
Mapper端的数据仅仅是本节点处理的数据,而Reducer端处理的数据是来自于多个Mapper任务的输出。因此在Mapper不能归约的数据,在Reducer端有可能归约处理。
在Mapper进行归约的类称为Combiner。是继承自Reducer类的自定义类。job.setCombinerClass(InvertedIndexCombiner.class); public static class InvertedIndexCombiner extends Reducer<T>{...} 。类名InvertedIndexCombiner为自定义。
要注意的是,Combiner只在Mapper任务所在的节点运行,不会跨Mapper任务运行。Reduce端接收所有Mapper端的输出来作为输入。虽然两边的归约类是同一个,但是执行的位置完全不一样。
并不是所有的归约工作都可以使用Combiner来做。比如求平均值就不能使用Combiner。因为对于平均数的归约算法不能多次调用。
以案例说明
案例文件内容为
需要统计出每个单词在各个URL出现的次数,如下图所示
package org.apache.hadoop.index;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author beifeng
*
*/
public class InvertedIndexMapReduce extends Configured implements Tool {
// step 1 : mapper
/**
* public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class WordCountMapper extends //
Mapper<LongWritable, Text, Text, Text> {
private Text mapOutputKey = new Text();
private Text mapOutputValue = new Text("1");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// split1 分割每行内容,分割后lines[0]为url,lines[1]为入 url后所有内容
String[] lines = value.toString().split("##");
// get url
String url = lines[0];
// split2 将"the apache hadoop"以空格分割
String[] strs = lines[1].split(" ");
//遍历[the, apache, hadoop]
for (String str : strs) {
mapOutputKey.set(str + "," + url);
context.write(mapOutputKey, mapOutputValue);
// mapOutputKey为
// the,url-01 apache,url-01 hadoop,url-01
// apache,url-02 framwor,url-02 hadoop,url-02
// the,url-03 commmon,url-03 apache,url-03
// apache,url-01 the,url-01 hadoop,url-01
// apache,url-02 frmework,url-02
// mapOutputValue为1
}
}
}
//
// set combiner class
public static class InvertedIndexCombiner extends //
Reducer<Text, Text, Text, Text> {
private Text CombinerOutputKey = new Text();
private Text CombinerOutputValue = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// split 以逗号分割"the,url-01" [the, url-01]
String[] strs = key.toString().split(",");
// set key
CombinerOutputKey.set(strs[0] + "\n");
// set value mapOutputValue输入的value为1
//将该行the为key的value加总
int sum = 0;
for (Text value : values) {
sum += Integer.valueOf(value.toString());
}
//Combiner输出的key为the value设置为url-01:1
CombinerOutputValue.set(strs[1] + ":" + sum);
context.write(CombinerOutputKey, CombinerOutputValue);
// combiner合并后输出
// <the, url-01:2> <the, url-03:1>
// <apache, url-01:1> <apache, url-02:2>
// <hadoop, url-01:2> <hadoop, url-02:1>
// ...
}
}
// step 2 : reducer 由于combiner已经计算好各单词即key在各url出现的次数
// Reducer只需按照key把对应的value拼接即完成第二次合并。
public static class WordCountReducer extends //
Reducer<Text, Text, Text, Text> {
private Text outputValue = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// TODO
String result = new String();
for (Text value : values) {
result += value.toString() + "\t";
}
outputValue.set(result);
context.write(key, outputValue);
}
}
// step 3 : job
public int run(String[] args) throws Exception {
// 1 : get configuration
Configuration configuration = super.getConf();
// 2 : create job
Job job = Job.getInstance(//
configuration,//
this.getClass().getSimpleName());
job.setJarByClass(InvertedIndexMapReduce.class);
// job.setNumReduceTasks(tasks);
// 3 : set job
// input --> map --> reduce --> output
// 3.1 : input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 : mapper
job.setMapperClass(WordCountMapper.class);
// TODO
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// ====================shuffle==========================
// 1: partition
// job.setPartitionerClass(cls);
// 2: sort
// job.setSortComparatorClass(cls);
// 3: combine
job.setCombinerClass(InvertedIndexCombiner.class);
// 4: compress
// set by configuration
// 5 : group
// job.setGroupingComparatorClass(cls);
// ====================shuffle==========================
// 3.3 : reducer
job.setReducerClass(WordCountReducer.class);
// TODO
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 3.4 : output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 4 : submit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
args = new String[] {
"hdfs://hadoop-senior.ibeifeng.com:8020/input/index.txt",
"hdfs://hadoop-senior.ibeifeng.com:8020/output6/"
};
// get configuration
Configuration configuration = new Configuration();
// configuration.set(name, value);
// run job
int status = ToolRunner.run(//
configuration,//
new InvertedIndexMapReduce(),//
args);
// exit program
System.exit(status);
}
}