@ghimi
2018-05-25T12:50:20.000000Z
字数 4369
阅读 789
MapReduce
天气
package mapreduce.tq2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Main {
//分组比较器
/*
* 进入同一个reduce的key是按照顺序排好的,该类使得:
* 如果连续(注意,一定连续)的两条或多条记录满足同组(即compare方法返回0)的条件,
* 即使key不相同,他们的value也会进入同一个values,执行一个reduce方法。
* 相反,如果原来key相同,但是并不满足同组的条件,他们的value也不会进入一个valeus。
* 最后返回的key是:满足这些条件的一组key中排在最后的那个。
*/
public static class TGroupingComparator extends WritableComparator {
public TGroupingComparator() {
super(TQ.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ) a;
TQ t2 = (TQ) b;
int compare = Integer.compare(t1.getYear(), t2.getYear());
// 分组比较器比较年和月份,使得同一年月的记录位于同一ReduceTask当中
return compare == 0 ? Integer.compare(t1.getMonth(), t2.getMonth()) : compare;
}
}
//排序比较器,进行 Mapper 端的排序,如果没有制定的话,默认使用 KEY 进行比较
public static class TSorter extends WritableComparator {
public TSorter() {
super(TQ.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ) a;
TQ t2 = (TQ) b;
int compare = Integer.compare(t1.getYear(), t2.getYear());
if (compare == 0) {
compare = Integer.compare(t1.getMonth(), t2.getMonth());
if (compare == 0) {
return -Integer.compare(t1.getWd(), t2.getWd());
}
}
return compare;
}
}
//分区器,用来对不同的 key 进行分区
public static class TPartitioner extends Partitioner<TQ, IntWritable> {
@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {
return key.getYear() % numPartitions;
}
}
public class TQ implements WritableComparable<TQ> {
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return this.year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return this.month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return this.day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return this.wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
this.day = in.readInt();
this.wd = in.readInt();
}
@Override
public int compareTo(TQ o) {
int compare = Integer.compare(this.year, o.getYear());
if (compare == 0) {
compare = Integer.compare(this.month, o.getMonth());
if (compare == 0) {
return Integer.compare(this.day, o.getDay());
}
}
return compare;
}
}
// Reducer 端的聚合逻辑
public static class TReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {
private Text rkey = new Text();
private IntWritable rval = new IntWritable();
@Override
protected void reduce(TQ key, Iterable<IntWritable> values,
Reducer<TQ, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int flg = 0;
int day = 0;
for (IntWritable intWritable : values) {
if(flg == 0){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
}
}
}
}
//Mapper 端的聚合逻辑
public static class TMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
}
//作业程序的主要入口
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Main.class);
job.setMapperClass(TMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
// mapTask中的分区器
job.setPartitionerClass(TPartitioner.class);
// mapTask中的排序比较器
job.setSortComparatorClass(TSorter.class);
// mapTask中的分组比较器
job.setGroupingComparatorClass(TGroupingComparator.class);
job.setReducerClass(TReducer.class);
job.setNumReduceTasks(2);
Path infile = new Path("/data/tq/input");
FileInputFormat.addInputPath(job, infile);
//当输出路径存在时删除输出路径
Path outfile = new Path("/data/tq/output/");
//当输出路径存在时删除输出路径
if(outfile.getFileSystem(conf).exists(outfile)){
outfile.getFileSystem(conf).delete(outfile,true);
}
//设置输出路径
FileOutputFormat.setOutputPath(job, outfile);
job.waitForCompletion(true);
}
}