[关闭]
@hadoopMan 2015-11-22T15:16:21.000000Z 字数 5144 阅读 939

mapreduce单词统计案例编程

初探mapreduce


1,环境搭建

1.1 准备软件

apache-maven-3.0.5-bin.tar.gz
eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz

1.2 安装apache-maven-3.0.5

1.21首先将apache-maven-3.0.5-bin.tar.gz解压到/opt/compileHadoop/目录下。

  1. $tar -zxvf apache-maven-3.0.5-bin.tar.gz ./

1.22配置环境变量。
使用sudo vi /etc/profile命令打开文件,在末尾添加内容如下:

  1. $vi /etc/profile
  2. 添加
  3. export MAVEN_HOME=/usr/local/apache-maven-3.0.5
  4. export PATH=$PATH:$MAVEN_HOME/bin

1.23验证mvn是否安装成功
mvn安装成功

1.3 安装eclipse

使用cd /opt/tools/ 进入eclipse 的安装目录,然后使用如下命令,将文件解压到当前目录内:

  1. $tar zxf eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz ./

使用cd eclipse命令进入软件主目录,可以查看软件的文件:
list

1.4 使用eclipse创建mvn工程。

1.41 首先使用./eclipse命令打开eclipse程序。
eclipse
1.42 安装mvn到eclipse。
在eclipse界面下打开window,出现如下窗口:
window
点击preference,弹出如下窗口,添加apache-maven-3.0.5安装目录,完成后如下:点击preference,弹出如下窗口,添加apache-maven-3.0.5安装目录,完成后如下:
prefrence
打开user setting配置窗口如下:
setting
首先我们来创建/home/hadoop/.m2目录:
.m2
创建成功后:
成功
拷贝setting.xml到/home/hadoop/.m2目录下:

  1. $cp /opt/compileHadoop/apache-maven-3.0.5/conf/settings.xml ~/.m2/

1.42创建mvn工程.
create1
create2
选中maven project点击next,进入下图,再点击next:
create3
选择如下内容后点击next
create4
填写工程组及工程名,点击finish.
createfinish
添加一个存储配置文件目录,名字如下,点击finish。
resource
拷贝core-site.xml、hdf-site.xml及log4j.properties到
/opt/tools/workspace/senior-hdfs/src/main/resource目录下,命令如下:

  1. $cp /opt/modules/hadoop-2.5.0/etc/hadoop/core-site.xml /opt/tools/workspace/senior-hdfs/src/main/resource
  2. $cp /opt/modules/hadoop-2.5.0/etc/hadoop/hdfs-site.xml /opt/tools/workspace/senior-hdfs/src/main/resource
  3. $cp /opt/modules/hadoop-2.5.0/etc/hadoop/log4j.properties /opt/tools/workspace/senior-hdfs/src/main/resource

执行以上命令后,点击刷新目录,该目录下多出三个文件。
list
添加输出目录:
output
修改prom.xml内容为:

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <hadoop.version>2.5.0</hadoop.version>
  4. </properties>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-client</artifactId>
  9. <version>${hadoop.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>junit</groupId>
  13. <artifactId>junit</artifactId>
  14. <version>4.10</version>
  15. <scope>test</scope>
  16. </dependency>
  17. </dependencies>

到此处,一个完整的mvn工程算是建立完成。

2,基于【八股文】格式编写wordcount程序

程序内容如下:

  1. package com.hadoop.hdfs.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. public class WordCoundMapreduce {
  14. //map
  15. public static class WordCoundMaper extends Mapper<LongWritable, Text, Text, IntWritable>{
  16. private Text mapOutputKey=new Text();
  17. private final static IntWritable mapOutputValue=new IntWritable(1);
  18. @Override
  19. protected void map(LongWritable key, Text value, Context context)
  20. throws IOException, InterruptedException {
  21. String lineValue=value.toString();//按行读取的数据转化为字符串
  22. String[] strs = lineValue.split(" ");//按空格分割
  23. for(String str:strs){
  24. mapOutputKey.set(str);//Sting to TEXT文本类型
  25. context.write(mapOutputKey, mapOutputValue);//写到上下文中,专递给reduce
  26. }
  27. }
  28. }
  29. //reduced `13
  30. public static class WordCoundReducer extends Reducer<Text, IntWritable,Text ,IntWritable >{
  31. private IntWritable outputValue=new IntWritable();
  32. @Override
  33. protected void reduce(Text key, Iterable<IntWritable> values,Context context)
  34. throws IOException, InterruptedException {
  35. int sum=0;//暂存key的总数值
  36. for(IntWritable value:values){
  37. sum+=value.get();//计算同一个key的元素数量
  38. }
  39. outputValue.set(sum);//int转化为IntWritable
  40. context.write(key, outputValue);//通过写到上下文中来写道hdfs中
  41. }
  42. }
  43. //driver
  44. public int run(String[] args)throws Exception{
  45. Configuration conf=new Configuration();//
  46. Job job = Job.getInstance(conf, this.getClass().getCanonicalName());
  47. job.setJarByClass(this.getClass());
  48. //设置输入路径
  49. Path inPath=new Path(args[0]);
  50. FileInputFormat.addInputPath(job, inPath);
  51. job.setMapperClass(WordCoundMaper.class);//设置map类
  52. job.setMapOutputKeyClass(Text.class);//设置输出key类型
  53. job.setMapOutputValueClass(IntWritable.class);//设置输出value类型
  54. job.setReducerClass(WordCoundReducer.class);//设置reduce类
  55. job.setOutputKeyClass(Text.class);//设置输出key类型
  56. job.setOutputValueClass(IntWritable.class);//设置输出value类型
  57. //设置输出路径
  58. Path outPath=new Path(args[1]);
  59. FileOutputFormat.setOutputPath(job, outPath);
  60. boolean isSuccess=job.waitForCompletion(true);//等待作业完成
  61. return isSuccess ? 0 : 1;
  62. }
  63. public static void main(String[] args) throws Exception{
  64. args =new String[]{"hdfs://miaodonghua.host:8020/user/hadoop/input/",//
  65. "hdfs://miaodonghua.host:8020/user/hadoop/output3"};
  66. int status = new WordCoundMapreduce().run(args);
  67. System.exit(status);
  68. }
  69. }

在当前用了目录下创建imput目录,并将/opt/datas/wc.input上传:

  1. $bin/hdfs dfs -mkdir input/
  2. $bin/hdfs dfs -put /opt/datas/wc.input input/

在eclipse工作空间内运行程序。
build

3 将八股文形式的wordcount导出为jar包,以便在yarn上运行。

output jar
output jar1
output jar2
output jar3
output jar4
导出成功后,可在/opt/tools/目录下查看:
导出成功后,可在/opt/tools/目录下查看:

4,运行生成的wordcount.jar,命令如下:

  1. $bin/yarn jar /opt/tools/wordcount.jar com.hadoop.dfs.mapreduce.wordcountmapreduce /usr/hadoop/input /usr/hadoop/output

可以在webaap,查看到生成目录:
output
Mapreduce进行词频统计的时候主要分为两部分,即map和reduce.

(1) hdfs文件由inputSplit分割成不同的块,然后交由RecordReader按行形成对(这里key指的是行偏移,value指的是正行的内容)。

(2) map读取RecordReader形成的,经过处理生成,通过上下文context传递reduce。(这里key指的是单词,value指的是单词的数量.map生成的已经根据key进行了排序。)

(3)Reduce读取map生成的,然后将相同key的整合成使用context上下文写入到hdfs文件中.这里的key与value是用制表符分割。

注:map的数量由InputSplit决定,一个InputSplit对应一个map。而InputSplit的数量由文件的大小及数量决定。文件大小小于系统的block大小,则一个文件就对应一个InputSplit。如果文件大于了block块,则将文件按block分割一个分割块对应一个InputSplit。

完整的程序流程如下:
map to reduce

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注