[关闭]
@Arslan6and6 2016-08-29T02:12:24.000000Z 字数 13676 阅读 618

【作业十】殷杰

第五章、高级Hadoop2.x

---MapReduce高级应用练习 【二次排序案例】和【MapReduce Join案例】

作业描述:

依据上课讲解【二次排序案例】和【MapReduce Join案例】进行编码实现和要点分析,以下几点必须在作业中体现:

1) 理解【二次排序】功能,使用自己理解的方式表达(包括自定义数据类型、分区、分组、排序)。

2) 编码实现二次排序功能,提供源代码文件

3) 理解 MapReduce Join 的几种方式,编码实现 Reduce Join,提供源代码,说出思路。

1) 理解【二次排序】功能,使用自己理解的方式表达(包括自定义数据类型、分区、分组、排序)。

1.二次排序概念:
首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。
例如:将下列内容的文件用二次排序程序执行
01.jpg-5.6kB
要得到下列结果
02.jpg-5.9kB

2、思路
将键设为综合原始键和值的复合键。
键comparator应该以复合键排序,即原本的键和值组合。
partitioner和复合键comparator的分组只考虑本来键的分区和分组。

3、实现分析

(1)自定义复合键key,由第一个字段即原本的key和第二个字段value组成,数据类型设定为PairWritable,即第一个字段Text和第二个字段IntWritable数据类型组合。并设定一个PairWritable继承WritableComparable接口,以实现序列化和反序列化。

(2)为程序调优设置map端shuffle分区。并创建FirstPartitioner分区类。
05.jpg-15.1kB
该过程以第一个字段key进行分区,以key为依据自然就是比较key。为整个MR过程中第一次比较。本例中,设置分区类后a,1 a,100 a,3被分到一个reducetast,以此类推。如不设置分区类则按行分区。
04.jpg-53.1kB

(3)为程序调优设置reduce端shuffle分组。并创建FirstPartitioner分组类。如不设置分组则按默认复合键个数分组。
06.jpg-12kB
只要第一个字段key相同,而不是复合健相同。就把对于的value放进同一个组。
07.jpg-68.8kB
03.jpg-68kB

(4)在PairWritable类中以第一个字段为key为value排序,而不是以复合键。
08.jpg-32.3kB

2) 编码实现二次排序功能,提供源代码文件

SecondarySortMapReduce.java

  1. package org.apache.hadoop.sort;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Mapper.Context;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. public class SecondarySortMapReduce extends Configured implements Tool {
  18. // step 1 : mapper
  19. /**
  20. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  21. */
  22. public static class WordCountMapper extends //
  23. Mapper<LongWritable, Text, PairWritable, IntWritable> {
  24. private PairWritable mapOutputKey = new PairWritable();
  25. private IntWritable mapOutputValue = new IntWritable();
  26. @Override
  27. public void map(LongWritable key, Text value, Context context)
  28. throws IOException, InterruptedException {
  29. // line -- split
  30. String[] strs = value.toString().split(",");
  31. // map output key
  32. mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
  33. // map output value
  34. mapOutputValue.set(Integer.valueOf(strs[1]));
  35. context.write(mapOutputKey, mapOutputValue);
  36. }
  37. }
  38. // step 2 : reducer
  39. /**
  40. *
  41. * a#1 1 a#3 3 a#100 100
  42. *
  43. * @author beifeng
  44. *
  45. */
  46. public static class WordCountReducer extends //
  47. Reducer<PairWritable, IntWritable, Text, IntWritable> {
  48. private Text outputKey = new Text();
  49. @Override
  50. public void reduce(PairWritable key, Iterable<IntWritable> values,
  51. Context context) throws IOException, InterruptedException {
  52. // set output key
  53. outputKey.set(key.getFirst());
  54. for (IntWritable value : values) {
  55. context.write(outputKey, value);
  56. }
  57. }
  58. }
  59. // step 3 : job
  60. public int run(String[] args) throws Exception {
  61. // 1 : get configuration
  62. Configuration configuration = super.getConf();
  63. // 2 : create job
  64. Job job = Job.getInstance(//
  65. configuration,//
  66. this.getClass().getSimpleName());
  67. job.setJarByClass(SecondarySortMapReduce.class);
  68. // job.setNumReduceTasks(tasks);
  69. // 3 : set job
  70. // input --> map --> reduce --> output
  71. // 3.1 : input
  72. Path inPath = new Path(args[0]);
  73. FileInputFormat.addInputPath(job, inPath);
  74. // 3.2 : mapper
  75. job.setMapperClass(WordCountMapper.class);
  76. // TODO
  77. job.setMapOutputKeyClass(PairWritable.class);
  78. job.setMapOutputValueClass(IntWritable.class);
  79. // ====================shuffle==========================
  80. // 1: partition
  81. job.setPartitionerClass(FirstPartitioner.class);
  82. // 2: sort
  83. // job.setSortComparatorClass(cls);
  84. // 3: combine
  85. // job.setCombinerClass(cls);
  86. // 4: compress
  87. // set by configuration
  88. // 5 : group
  89. job.setGroupingComparatorClass(FirstGroupingComparator.class);
  90. // ====================shuffle==========================
  91. // 3.3 : reducer
  92. job.setReducerClass(WordCountReducer.class);
  93. // TODO
  94. job.setOutputKeyClass(Text.class);
  95. job.setOutputValueClass(IntWritable.class);
  96. // 3.4 : output
  97. Path outPath = new Path(args[1]);
  98. FileOutputFormat.setOutputPath(job, outPath);
  99. // 4 : submit job
  100. boolean isSuccess = job.waitForCompletion(true);
  101. return isSuccess ? 0 : 1;
  102. }
  103. public static void main(String[] args) throws Exception {
  104. //args = new String[] {
  105. // "hdfs://hadoop-senior.ibeifeng.com:8020/input/sort.txt",
  106. // "hdfs://hadoop-senior.ibeifeng.com:8020/output2/"
  107. // };
  108. // get configuration
  109. Configuration configuration = new Configuration();
  110. // configuration.set(name, value);
  111. // run job
  112. int status = ToolRunner.run(//
  113. configuration,//
  114. new SecondarySortMapReduce(),//
  115. args);
  116. // exit program
  117. System.exit(status);
  118. }
  119. }

PairWritable.java

  1. package org.apache.hadoop.sort;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class PairWritable implements WritableComparable<PairWritable> {
  7. private String first ;
  8. private int second ;
  9. public PairWritable() {
  10. }
  11. public PairWritable(String first, int second) {
  12. this.set(first, second);
  13. }
  14. public void set(String first, int second) {
  15. this.setFirst(first);
  16. this.setSecond(second);
  17. }
  18. public String getFirst() {
  19. return first;
  20. }
  21. public void setFirst(String first) {
  22. this.first = first;
  23. }
  24. public int getSecond() {
  25. return second;
  26. }
  27. public void setSecond(int second) {
  28. this.second = second;
  29. }
  30. @Override
  31. public int hashCode() {
  32. final int prime = 31;
  33. int result = 1;
  34. result = prime * result + ((first == null) ? 0 : first.hashCode());
  35. result = prime * result + second;
  36. return result;
  37. }
  38. @Override
  39. public boolean equals(Object obj) {
  40. if (this == obj)
  41. return true;
  42. if (obj == null)
  43. return false;
  44. if (getClass() != obj.getClass())
  45. return false;
  46. PairWritable other = (PairWritable) obj;
  47. if (first == null) {
  48. if (other.first != null)
  49. return false;
  50. } else if (!first.equals(other.first))
  51. return false;
  52. if (second != other.second)
  53. return false;
  54. return true;
  55. }
  56. public void write(DataOutput out) throws IOException {
  57. out.writeUTF(this.getFirst());
  58. out.writeInt(this.getSecond());
  59. }
  60. public void readFields(DataInput in) throws IOException {
  61. this.setFirst(in.readUTF());
  62. this.setSecond(in.readInt());
  63. }
  64. public int compareTo(PairWritable o) {
  65. // first
  66. int comp = this.getFirst().compareTo(o.getFirst()) ;
  67. if( 0 != comp ){
  68. return comp;
  69. }
  70. // second
  71. return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond()));
  72. }
  73. @Override
  74. public String toString() {
  75. return first + "," + second ;
  76. }
  77. }

FirstPartitioner.java

  1. package org.apache.hadoop.sort;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class FirstPartitioner extends Partitioner<PairWritable, IntWritable> {
  5. @Override
  6. public int getPartition(PairWritable key, IntWritable value,
  7. int numPartitions) {
  8. // TODO Auto-generated method stub
  9. return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
  10. }
  11. }

FirstGroupingComparator.java

  1. package org.apache.hadoop.sort;
  2. import org.apache.hadoop.io.RawComparator;
  3. import org.apache.hadoop.io.WritableComparator;
  4. public class FirstGroupingComparator implements RawComparator<PairWritable> {
  5. public int compare(PairWritable o1, PairWritable o2) {
  6. return o1.getFirst().compareTo(o2.getFirst());
  7. }
  8. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  9. return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
  10. }
  11. }

3) 理解 MapReduce Join 的几种方式,编码实现 Reduce Join,提供源代码,说出思路。

a)不同方式

map join :两个待连接的表,其中一个非常大,另一个非常小,可将小表直接放于内存中,DistributeCache实现。
reduce join:两个待连接的大表
semi join:map 端Join和reduce 端Join结合。

b):案例及思路

表文件信息:
customers.txt 按字段顺序排列 顾客编号 顾客姓名 顾客电话
以下为文件内容
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
orders.txt 按字段顺序排列 顾客编号 商品名称 交易价格 交易时间
以下为文件内容
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009

需求:把两个文件的内容按关联信息列示在一个虚拟表文件中。如 
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
...

分析思路:

map端:用关联字段作为key输出,本例中关联字段均为第一列顾客编号,用flag标签标记文件内容来源。
reduce端: 区分标签文件内容,按key关联2文件内容,输出到1个表文件。同一顾客编号多条订单信息按该顾客订单数量分行列示。

代码:
DistributedCacheWCMapReduce.java

  1. package com.ibeifeng.hadoop.mapreduce.cache;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.InputStreamReader;
  6. import java.net.URI;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. import java.util.StringTokenizer;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.conf.Configured;
  12. import org.apache.hadoop.filecache.DistributedCache;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.LongWritable;
  17. import org.apache.hadoop.io.Text;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Reducer;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  23. import org.apache.hadoop.util.Tool;
  24. import org.apache.hadoop.util.ToolRunner;
  25. public class DistributedCacheWCMapReduce extends Configured implements Tool{
  26. /**
  27. * Mapper
  28. */
  29. public static class DistributedCacheWCMapper extends
  30. Mapper<LongWritable, Text, Text, IntWritable> {
  31. // cache
  32. List<String> list = new ArrayList<String>();
  33. private final static IntWritable mapOutputValue = new IntWritable(1);
  34. private Text mapOutputKey = new Text();
  35. @SuppressWarnings("deprecation")
  36. @Override
  37. public void setup(Context context) throws IOException,
  38. InterruptedException {
  39. // step 1: get configuraiton
  40. Configuration conf = context.getConfiguration();
  41. // step 2: get cache uri
  42. URI[] uris = DistributedCache.getCacheFiles(conf);
  43. // step 3: path
  44. Path paht = new Path(uris[0]);
  45. // step 4: file system
  46. FileSystem fs = FileSystem.get(conf);
  47. // step 5: in stream
  48. InputStream inStream = fs.open(paht);
  49. // step 6: read data
  50. InputStreamReader isr = new InputStreamReader(inStream);
  51. BufferedReader bf = new BufferedReader(isr);
  52. String line ;
  53. while ((line = bf.readLine()) != null){
  54. if(line.trim().length() > 0){
  55. // add element
  56. list.add(line);
  57. }
  58. }
  59. bf.close();
  60. isr.close();
  61. inStream.close();
  62. // fs.close();
  63. }
  64. @Override
  65. public void map(LongWritable key, Text value, Context context)
  66. throws IOException, InterruptedException {
  67. String lineValue = value.toString();
  68. StringTokenizer st = new StringTokenizer(lineValue);
  69. while(st.hasMoreTokens()){
  70. String wordValue = st.nextToken();
  71. if(list.contains(wordValue)){
  72. continue;
  73. }
  74. // set map output key
  75. mapOutputKey.set(wordValue);
  76. // output
  77. context.write(mapOutputKey, mapOutputValue);
  78. }
  79. }
  80. @Override
  81. public void cleanup(Context context) throws IOException,
  82. InterruptedException {
  83. super.cleanup(context);
  84. }
  85. }
  86. /**
  87. * Reducer
  88. */
  89. public static class DistributedCacheWCReducer extends
  90. Reducer<Text, IntWritable, Text, IntWritable> {
  91. public IntWritable outputValue = new IntWritable();
  92. @Override
  93. public void setup(Context context) throws IOException,
  94. InterruptedException {
  95. super.setup(context);
  96. }
  97. @Override
  98. public void reduce(Text key, Iterable<IntWritable> values,
  99. Context context) throws IOException, InterruptedException {
  100. int sum = 0 ;
  101. // iterator
  102. for(IntWritable value : values){
  103. sum += value.get() ;
  104. }
  105. // set
  106. outputValue.set(sum);
  107. // job output
  108. context.write(key, outputValue);
  109. }
  110. @Override
  111. public void cleanup(Context context) throws IOException,
  112. InterruptedException {
  113. super.cleanup(context);
  114. }
  115. }
  116. /**
  117. * Driver :Job create,set,submit,run,monitor
  118. */
  119. public int run(String[] args) throws Exception {
  120. // step 1:get configuration
  121. Configuration conf = this.getConf();
  122. // step 2:create job
  123. Job job = this.parseInputAndOutput(this,conf,args);
  124. // step 4:set job
  125. // 2:mapper class
  126. job.setMapperClass(DistributedCacheWCMapper.class);
  127. job.setMapOutputKeyClass(Text.class);
  128. job.setMapOutputValueClass(IntWritable.class);
  129. // 4:reducer class
  130. job.setReducerClass(DistributedCacheWCReducer.class);
  131. job.setOutputKeyClass(Text.class);
  132. job.setOutputValueClass(IntWritable.class);
  133. // step 5:submit job
  134. boolean isSuccess = job.waitForCompletion(true);
  135. return isSuccess ? 0 : 1;
  136. }
  137. public Job parseInputAndOutput(Tool tool,Configuration conf,String[] args) throws Exception {
  138. // validate args
  139. if(args.length != 2){
  140. System.err.println("Usage : " + tool.getClass().getSimpleName() + " [generic options] <input> <output>");
  141. ToolRunner.printGenericCommandUsage(System.err);
  142. return null;
  143. }
  144. // step 2:create job
  145. Job job = Job.getInstance(conf, tool.getClass().getSimpleName());
  146. // job.addCacheFile(uri);
  147. // step 3:set job run class
  148. job.setJarByClass(tool.getClass());
  149. // 1:input format
  150. Path inputPath = new Path(args[0]);
  151. FileInputFormat.addInputPath(job, inputPath);
  152. // 5:output format
  153. Path outputPath = new Path(args[1]);
  154. FileOutputFormat.setOutputPath(job, outputPath);
  155. return job;
  156. }
  157. @SuppressWarnings("deprecation")
  158. public static void main(String[] args) throws Exception {
  159. //args = new String[]{
  160. // "hdfs://hadoop-yarn.beifeng.com:8020/user/beifeng/mr/distributedcache/input",//
  161. // "hdfs://hadoop-yarn.beifeng.com:8020/user/beifeng/mr/distributedcache/ouput"
  162. //};
  163. // mapreduce-default.xml,mapreduce-site.xml
  164. Configuration conf = new Configuration();
  165. // set distributed cache
  166. // ===============================================================
  167. URI uri = new URI("/user/beifeng/cachefile/cache.txt");
  168. DistributedCache.addCacheFile(uri, conf);
  169. // ===============================================================
  170. // run mapreduce
  171. int status = ToolRunner.run(conf, new DistributedCacheWCMapReduce(), args);
  172. // exit program
  173. System.exit(status);
  174. }
  175. }

map输出value数据类型为自定义数据类型,命名为DataJoinWritable。 故创建DataJoinWritable类。

  1. package org.apache.hadoop.join;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. public class DataJoinWritable implements Writable {
  7. private String tag ;
  8. private String data ;
  9. public DataJoinWritable() {
  10. }
  11. public DataJoinWritable(String tag, String data) {
  12. this.set(tag, data);
  13. }
  14. public void set(String tag, String data) {
  15. this.setTag(tag);
  16. this.setData(data);
  17. }
  18. public String getTag() {
  19. return tag;
  20. }
  21. public void setTag(String tag) {
  22. this.tag = tag;
  23. }
  24. public String getData() {
  25. return data;
  26. }
  27. public void setData(String data) {
  28. this.data = data;
  29. }
  30. @Override
  31. public int hashCode() {
  32. final int prime = 31;
  33. int result = 1;
  34. result = prime * result + ((data == null) ? 0 : data.hashCode());
  35. result = prime * result + ((tag == null) ? 0 : tag.hashCode());
  36. return result;
  37. }
  38. @Override
  39. public boolean equals(Object obj) {
  40. if (this == obj)
  41. return true;
  42. if (obj == null)
  43. return false;
  44. if (getClass() != obj.getClass())
  45. return false;
  46. DataJoinWritable other = (DataJoinWritable) obj;
  47. if (data == null) {
  48. if (other.data != null)
  49. return false;
  50. } else if (!data.equals(other.data))
  51. return false;
  52. if (tag == null) {
  53. if (other.tag != null)
  54. return false;
  55. } else if (!tag.equals(other.tag))
  56. return false;
  57. return true;
  58. }
  59. public void write(DataOutput out) throws IOException {
  60. out.writeUTF(this.getTag());
  61. out.writeUTF(this.getData());
  62. }
  63. public void readFields(DataInput in) throws IOException {
  64. this.setTag(in.readUTF());
  65. this.setData(in.readUTF());
  66. }
  67. @Override
  68. public String toString() {
  69. return tag + "," + data ;
  70. }
  71. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注