[关闭]
@ghimi 2018-05-25T12:50:20.000000Z 字数 4369 阅读 789

MapReduce 实现的天气案例

MapReduce 天气


  1. package mapreduce.tq2;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.util.Date;
  6. import java.util.Iterator;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.io.WritableComparator;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.Partitioner;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. public class Main {
  21. //分组比较器
  22. /*
  23. * 进入同一个reduce的key是按照顺序排好的,该类使得:
  24. * 如果连续(注意,一定连续)的两条或多条记录满足同组(即compare方法返回0)的条件,
  25. * 即使key不相同,他们的value也会进入同一个values,执行一个reduce方法。
  26. * 相反,如果原来key相同,但是并不满足同组的条件,他们的value也不会进入一个valeus。
  27. * 最后返回的key是:满足这些条件的一组key中排在最后的那个。
  28. */
  29. public static class TGroupingComparator extends WritableComparator {
  30. public TGroupingComparator() {
  31. super(TQ.class, true);
  32. }
  33. @SuppressWarnings("rawtypes")
  34. @Override
  35. public int compare(WritableComparable a, WritableComparable b) {
  36. TQ t1 = (TQ) a;
  37. TQ t2 = (TQ) b;
  38. int compare = Integer.compare(t1.getYear(), t2.getYear());
  39. // 分组比较器比较年和月份,使得同一年月的记录位于同一ReduceTask当中
  40. return compare == 0 ? Integer.compare(t1.getMonth(), t2.getMonth()) : compare;
  41. }
  42. }
  43. //排序比较器,进行 Mapper 端的排序,如果没有制定的话,默认使用 KEY 进行比较
  44. public static class TSorter extends WritableComparator {
  45. public TSorter() {
  46. super(TQ.class, true);
  47. }
  48. @SuppressWarnings("rawtypes")
  49. @Override
  50. public int compare(WritableComparable a, WritableComparable b) {
  51. TQ t1 = (TQ) a;
  52. TQ t2 = (TQ) b;
  53. int compare = Integer.compare(t1.getYear(), t2.getYear());
  54. if (compare == 0) {
  55. compare = Integer.compare(t1.getMonth(), t2.getMonth());
  56. if (compare == 0) {
  57. return -Integer.compare(t1.getWd(), t2.getWd());
  58. }
  59. }
  60. return compare;
  61. }
  62. }
  63. //分区器,用来对不同的 key 进行分区
  64. public static class TPartitioner extends Partitioner<TQ, IntWritable> {
  65. @Override
  66. public int getPartition(TQ key, IntWritable value, int numPartitions) {
  67. return key.getYear() % numPartitions;
  68. }
  69. }
  70. public class TQ implements WritableComparable<TQ> {
  71. private int year;
  72. private int month;
  73. private int day;
  74. private int wd;
  75. public int getYear() {
  76. return this.year;
  77. }
  78. public void setYear(int year) {
  79. this.year = year;
  80. }
  81. public int getMonth() {
  82. return this.month;
  83. }
  84. public void setMonth(int month) {
  85. this.month = month;
  86. }
  87. public int getDay() {
  88. return this.day;
  89. }
  90. public void setDay(int day) {
  91. this.day = day;
  92. }
  93. public int getWd() {
  94. return this.wd;
  95. }
  96. public void setWd(int wd) {
  97. this.wd = wd;
  98. }
  99. @Override
  100. public void write(DataOutput out) throws IOException {
  101. out.writeInt(year);
  102. out.writeInt(month);
  103. out.writeInt(day);
  104. out.writeInt(wd);
  105. }
  106. @Override
  107. public void readFields(DataInput in) throws IOException {
  108. this.year = in.readInt();
  109. this.month = in.readInt();
  110. this.day = in.readInt();
  111. this.wd = in.readInt();
  112. }
  113. @Override
  114. public int compareTo(TQ o) {
  115. int compare = Integer.compare(this.year, o.getYear());
  116. if (compare == 0) {
  117. compare = Integer.compare(this.month, o.getMonth());
  118. if (compare == 0) {
  119. return Integer.compare(this.day, o.getDay());
  120. }
  121. }
  122. return compare;
  123. }
  124. }
  125. // Reducer 端的聚合逻辑
  126. public static class TReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {
  127. private Text rkey = new Text();
  128. private IntWritable rval = new IntWritable();
  129. @Override
  130. protected void reduce(TQ key, Iterable<IntWritable> values,
  131. Reducer<TQ, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  132. int flg = 0;
  133. int day = 0;
  134. for (IntWritable intWritable : values) {
  135. if(flg == 0){
  136. rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
  137. }
  138. }
  139. }
  140. }
  141. //Mapper 端的聚合逻辑
  142. public static class TMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  143. }
  144. //作业程序的主要入口
  145. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  146. // TODO Auto-generated method stub
  147. Configuration conf = new Configuration();
  148. Job job = Job.getInstance(conf);
  149. job.setJarByClass(Main.class);
  150. job.setMapperClass(TMapper.class);
  151. job.setMapOutputKeyClass(TQ.class);
  152. job.setMapOutputValueClass(IntWritable.class);
  153. // mapTask中的分区器
  154. job.setPartitionerClass(TPartitioner.class);
  155. // mapTask中的排序比较器
  156. job.setSortComparatorClass(TSorter.class);
  157. // mapTask中的分组比较器
  158. job.setGroupingComparatorClass(TGroupingComparator.class);
  159. job.setReducerClass(TReducer.class);
  160. job.setNumReduceTasks(2);
  161. Path infile = new Path("/data/tq/input");
  162. FileInputFormat.addInputPath(job, infile);
  163. //当输出路径存在时删除输出路径
  164. Path outfile = new Path("/data/tq/output/");
  165. //当输出路径存在时删除输出路径
  166. if(outfile.getFileSystem(conf).exists(outfile)){
  167. outfile.getFileSystem(conf).delete(outfile,true);
  168. }
  169. //设置输出路径
  170. FileOutputFormat.setOutputPath(job, outfile);
  171. job.waitForCompletion(true);
  172. }
  173. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注