@Arslan6and6
2016-04-30T16:13:31.000000Z
字数 3775
阅读 777
第四章、深入Hadoop 2.x
map: (K1, V1) → list(K2, V2) //K1
reduce: (K2, list(V2)) → list(K3, V3)
protected void map(KEY key, VALUE value, Context
context) throws IOException, InterruptedException {
}
protected void reduce(KEY key, Iterable values,
Context context) throws IOException,
InterruptedException {
}
public class *MapReduceMs {Mapper区public static class *Mapper extends Mapper<?..> {//TODO@Overridepublic void map(?..) throws Exception {//TODO}}Reduce区public static class *Reduce extends Reducer<?..> {//TODO@Overrideprotected void reduce(?.. ) throws Exception {//TODO}}Driver区public int run (String[] args) throws Exception {Configuration config = new Configuration();// 2 : Create jobJob job =Job.getInstance(config, this.getClass().getSimpleName());job.setJarByClass(WordCountMapReduce.class);//3 : set job//input---> map ---> reduce ---> output//3.1 : inputPath inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);//3.1 mapperjob.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//3.2 reducejob.setReducerClass(WordCountReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//3.3 outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);//3.4 sumbmit jobboolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1 ;}public static void main(String[] args) throws Exception {int status = new WordCountMapReduce().run(args);System.exit(status);}}
以wordcount举例:package org.apache.hadoop.studyhdfs;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountMapReduce {//step 1public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text mapOutpuKey = new Text ();private final static IntWritable mapOutputValue = new IntWritable(1);@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String lineValue = value.toString();String [] strs = lineValue.split(":");for(String str : strs) {mapOutpuKey.set(str);context.write(mapOutpuKey, mapOutputValue);}}}//step 2public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable outputValue = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context )throws IOException, InterruptedException {//temp sumint sum = 0 ;for (IntWritable value : values) {sum += value.get();}outputValue.set(sum);//outputcontext.write(key, outputValue);}}//step 3public int run (String[] args) throws Exception {// 3.01 new ConfigurationConfiguration config = new Configuration();// 3.02 : Create jobJob job =Job.getInstance(config, this.getClass().getSimpleName());job.setJarByClass(WordCountMapReduce.class);// 3.03 : set job//input---> map ---> reduce ---> output// 3.03.01 : inputPath inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);// 3.03.02 mapperjob.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 3.03.03 reducejob.setReducerClass(WordCountReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 3.03.04 outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);// 3.04.05 sumbmit jobboolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1 ;}public static void main(String[] args) throws Exception {int status = new WordCountMapReduce().run(args);System.exit(status);}}