[关闭]
@Arslan6and6 2016-04-30T16:13:31.000000Z 字数 3775 阅读 714

编写 MapReduce 程序

第四章、深入Hadoop 2.x


MapReduce 八股文

uMapReduce中,map和reduce函数遵循如下常规格式:

map: (K1, V1) → list(K2, V2) //K1
reduce: (K2, list(V2)) → list(K3, V3)

Mapper的基类:

protected void map(KEY key, VALUE value, Context
context) throws IOException, InterruptedException {
}

Reducer的基类:

protected void reduce(KEY key, Iterable values,
Context context) throws IOException,
InterruptedException {
}

Context是上下文对象

MapReduce 八股文模板

  1. public class *MapReduceMs {
  2. Mapper
  3. public static class *Mapper extends Mapper<?..> {
  4. //TODO
  5. @Override
  6. public void map(?..) throws Exception {
  7. //TODO
  8. }
  9. }
  10. Reduce
  11. public static class *Reduce extends Reducer<?..> {
  12. //TODO
  13. @Override
  14. protected void reduce(?.. ) throws Exception {
  15. //TODO
  16. }
  17. }
  18. Driver
  19. public int run (String[] args) throws Exception {
  20. Configuration config = new Configuration();
  21. // 2 : Create job
  22. Job job =Job.getInstance(config, this.getClass().getSimpleName());
  23. job.setJarByClass(WordCountMapReduce.class);
  24. //3 : set job
  25. //input---> map ---> reduce ---> output
  26. //3.1 : input
  27. Path inPath = new Path(args[0]);
  28. FileInputFormat.addInputPath(job, inPath);
  29. //3.1 mapper
  30. job.setMapperClass(WordCountMapper.class);
  31. job.setMapOutputKeyClass(Text.class);
  32. job.setMapOutputValueClass(IntWritable.class);
  33. //3.2 reduce
  34. job.setReducerClass(WordCountReduce.class);
  35. job.setOutputKeyClass(Text.class);
  36. job.setOutputValueClass(IntWritable.class);
  37. //3.3 output
  38. Path outPath = new Path(args[1]);
  39. FileOutputFormat.setOutputPath(job, outPath);
  40. //3.4 sumbmit job
  41. boolean isSuccess = job.waitForCompletion(true);
  42. return isSuccess ? 0 : 1 ;
  43. }
  44. public static void main(String[] args) throws Exception {
  45. int status = new WordCountMapReduce().run(args);
  46. System.exit(status);
  47. }
  48. }
  1. wordcount举例:
  2. package org.apache.hadoop.studyhdfs;
  3. import java.io.IOException;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. public class WordCountMapReduce {
  15. //step 1
  16. public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  17. private Text mapOutpuKey = new Text ();
  18. private final static IntWritable mapOutputValue = new IntWritable(1);
  19. @Override
  20. public void map(LongWritable key, Text value, Context context)
  21. throws IOException, InterruptedException {
  22. String lineValue = value.toString();
  23. String [] strs = lineValue.split(":");
  24. for(String str : strs) {
  25. mapOutpuKey.set(str);
  26. context.write(mapOutpuKey, mapOutputValue);
  27. }
  28. }
  29. }
  30. //step 2
  31. public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  32. private IntWritable outputValue = new IntWritable();
  33. @Override
  34. protected void reduce(Text key, Iterable<IntWritable> values, Context context )
  35. throws IOException, InterruptedException {
  36. //temp sum
  37. int sum = 0 ;
  38. for (IntWritable value : values) {
  39. sum += value.get();
  40. }
  41. outputValue.set(sum);
  42. //output
  43. context.write(key, outputValue);
  44. }
  45. }
  46. //step 3
  47. public int run (String[] args) throws Exception {
  48. // 3.01 new Configuration
  49. Configuration config = new Configuration();
  50. // 3.02 : Create job
  51. Job job =Job.getInstance(config, this.getClass().getSimpleName());
  52. job.setJarByClass(WordCountMapReduce.class);
  53. // 3.03 : set job
  54. //input---> map ---> reduce ---> output
  55. // 3.03.01 : input
  56. Path inPath = new Path(args[0]);
  57. FileInputFormat.addInputPath(job, inPath);
  58. // 3.03.02 mapper
  59. job.setMapperClass(WordCountMapper.class);
  60. job.setMapOutputKeyClass(Text.class);
  61. job.setMapOutputValueClass(IntWritable.class);
  62. // 3.03.03 reduce
  63. job.setReducerClass(WordCountReduce.class);
  64. job.setOutputKeyClass(Text.class);
  65. job.setOutputValueClass(IntWritable.class);
  66. // 3.03.04 output
  67. Path outPath = new Path(args[1]);
  68. FileOutputFormat.setOutputPath(job, outPath);
  69. // 3.04.05 sumbmit job
  70. boolean isSuccess = job.waitForCompletion(true);
  71. return isSuccess ? 0 : 1 ;
  72. }
  73. public static void main(String[] args) throws Exception {
  74. int status = new WordCountMapReduce().run(args);
  75. System.exit(status);
  76. }
  77. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注