@Arslan6and6
2016-04-30T16:13:31.000000Z
字数 3775
阅读 714
第四章、深入Hadoop
2.x
map: (K1, V1) → list(K2, V2) //K1
reduce: (K2, list(V2)) → list(K3, V3)
protected void map(KEY key, VALUE value, Context
context) throws IOException, InterruptedException {
}
protected void reduce(KEY key, Iterable values,
Context context) throws IOException,
InterruptedException {
}
public class *MapReduceMs {
Mapper区
public static class *Mapper extends Mapper<?..> {
//TODO
@Override
public void map(?..) throws Exception {
//TODO
}
}
Reduce区
public static class *Reduce extends Reducer<?..> {
//TODO
@Override
protected void reduce(?.. ) throws Exception {
//TODO
}
}
Driver区
public int run (String[] args) throws Exception {
Configuration config = new Configuration();
// 2 : Create job
Job job =Job.getInstance(config, this.getClass().getSimpleName());
job.setJarByClass(WordCountMapReduce.class);
//3 : set job
//input---> map ---> reduce ---> output
//3.1 : input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
//3.1 mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//3.2 reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//3.3 output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//3.4 sumbmit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1 ;
}
public static void main(String[] args) throws Exception {
int status = new WordCountMapReduce().run(args);
System.exit(status);
}
}
以wordcount举例:
package org.apache.hadoop.studyhdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMapReduce {
//step 1
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text mapOutpuKey = new Text ();
private final static IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String [] strs = lineValue.split(":");
for(String str : strs) {
mapOutpuKey.set(str);
context.write(mapOutpuKey, mapOutputValue);
}
}
}
//step 2
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context )
throws IOException, InterruptedException {
//temp sum
int sum = 0 ;
for (IntWritable value : values) {
sum += value.get();
}
outputValue.set(sum);
//output
context.write(key, outputValue);
}
}
//step 3
public int run (String[] args) throws Exception {
// 3.01 new Configuration
Configuration config = new Configuration();
// 3.02 : Create job
Job job =Job.getInstance(config, this.getClass().getSimpleName());
job.setJarByClass(WordCountMapReduce.class);
// 3.03 : set job
//input---> map ---> reduce ---> output
// 3.03.01 : input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.03.02 mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 3.03.03 reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 3.03.04 output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 3.04.05 sumbmit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1 ;
}
public static void main(String[] args) throws Exception {
int status = new WordCountMapReduce().run(args);
System.exit(status);
}
}