@hadoopMan
2015-11-22T15:16:21.000000Z
字数 5144
阅读 939
初探mapreduce
apache-maven-3.0.5-bin.tar.gz
eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz
1.21首先将apache-maven-3.0.5-bin.tar.gz解压到/opt/compileHadoop/目录下。
$tar -zxvf apache-maven-3.0.5-bin.tar.gz ./
1.22配置环境变量。
使用sudo vi /etc/profile命令打开文件,在末尾添加内容如下:
$vi /etc/profile
添加
export MAVEN_HOME=/usr/local/apache-maven-3.0.5
export PATH=$PATH:$MAVEN_HOME/bin
1.23验证mvn是否安装成功
使用cd /opt/tools/ 进入eclipse 的安装目录,然后使用如下命令,将文件解压到当前目录内:
$tar –zxf eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz ./
使用cd eclipse命令进入软件主目录,可以查看软件的文件:
1.41 首先使用./eclipse命令打开eclipse程序。
1.42 安装mvn到eclipse。
在eclipse界面下打开window,出现如下窗口:
点击preference,弹出如下窗口,添加apache-maven-3.0.5安装目录,完成后如下:点击preference,弹出如下窗口,添加apache-maven-3.0.5安装目录,完成后如下:
打开user setting配置窗口如下:
首先我们来创建/home/hadoop/.m2目录:
创建成功后:
拷贝setting.xml到/home/hadoop/.m2目录下:
$cp /opt/compileHadoop/apache-maven-3.0.5/conf/settings.xml ~/.m2/
1.42创建mvn工程.
选中maven project点击next,进入下图,再点击next:
选择如下内容后点击next
填写工程组及工程名,点击finish.
添加一个存储配置文件目录,名字如下,点击finish。
拷贝core-site.xml、hdf-site.xml及log4j.properties到
/opt/tools/workspace/senior-hdfs/src/main/resource目录下,命令如下:
$cp /opt/modules/hadoop-2.5.0/etc/hadoop/core-site.xml /opt/tools/workspace/senior-hdfs/src/main/resource
$cp /opt/modules/hadoop-2.5.0/etc/hadoop/hdfs-site.xml /opt/tools/workspace/senior-hdfs/src/main/resource
$cp /opt/modules/hadoop-2.5.0/etc/hadoop/log4j.properties /opt/tools/workspace/senior-hdfs/src/main/resource
执行以上命令后,点击刷新目录,该目录下多出三个文件。
添加输出目录:
修改prom.xml内容为:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.5.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
到此处,一个完整的mvn工程算是建立完成。
程序内容如下:
package com.hadoop.hdfs.mapreduce;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCoundMapreduce {
//map
public static class WordCoundMaper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapOutputKey=new Text();
private final static IntWritable mapOutputValue=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue=value.toString();//按行读取的数据转化为字符串
String[] strs = lineValue.split(" ");//按空格分割
for(String str:strs){
mapOutputKey.set(str);//Sting to TEXT文本类型
context.write(mapOutputKey, mapOutputValue);//写到上下文中,专递给reduce
}
}
}
//reduced `13
public static class WordCoundReducer extends Reducer<Text, IntWritable,Text ,IntWritable >{
private IntWritable outputValue=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum=0;//暂存key的总数值
for(IntWritable value:values){
sum+=value.get();//计算同一个key的元素数量
}
outputValue.set(sum);//int转化为IntWritable
context.write(key, outputValue);//通过写到上下文中来写道hdfs中
}
}
//driver
public int run(String[] args)throws Exception{
Configuration conf=new Configuration();//
Job job = Job.getInstance(conf, this.getClass().getCanonicalName());
job.setJarByClass(this.getClass());
//设置输入路径
Path inPath=new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
job.setMapperClass(WordCoundMaper.class);//设置map类
job.setMapOutputKeyClass(Text.class);//设置输出key类型
job.setMapOutputValueClass(IntWritable.class);//设置输出value类型
job.setReducerClass(WordCoundReducer.class);//设置reduce类
job.setOutputKeyClass(Text.class);//设置输出key类型
job.setOutputValueClass(IntWritable.class);//设置输出value类型
//设置输出路径
Path outPath=new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
boolean isSuccess=job.waitForCompletion(true);//等待作业完成
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception{
args =new String[]{"hdfs://miaodonghua.host:8020/user/hadoop/input/",//
"hdfs://miaodonghua.host:8020/user/hadoop/output3"};
int status = new WordCoundMapreduce().run(args);
System.exit(status);
}
}
在当前用了目录下创建imput目录,并将/opt/datas/wc.input上传:
$bin/hdfs dfs -mkdir input/
$bin/hdfs dfs -put /opt/datas/wc.input input/
在eclipse工作空间内运行程序。
导出成功后,可在/opt/tools/目录下查看:
$bin/yarn jar /opt/tools/wordcount.jar com.hadoop.dfs.mapreduce.wordcountmapreduce /usr/hadoop/input /usr/hadoop/output
可以在webaap,查看到生成目录:
Mapreduce进行词频统计的时候主要分为两部分,即map和reduce.
(1) hdfs文件由inputSplit分割成不同的块,然后交由RecordReader按行形成对(这里key指的是行偏移,value指的是正行的内容)。
(2) map读取RecordReader形成的,经过处理生成,通过上下文context传递reduce。(这里key指的是单词,value指的是单词的数量.map生成的已经根据key进行了排序。)
(3)Reduce读取map生成的,然后将相同key的整合成使用context上下文写入到hdfs文件中.这里的key与value是用制表符分割。
注:map的数量由InputSplit决定,一个InputSplit对应一个map。而InputSplit的数量由文件的大小及数量决定。文件大小小于系统的block大小,则一个文件就对应一个InputSplit。如果文件大于了block块,则将文件按block分割一个分割块对应一个InputSplit。
完整的程序流程如下: