[关闭]
@gekeshi 2016-04-30T14:54:33.000000Z 字数 6511 阅读 375

Hadoop wordcount,PageRank实例

Hadoop


运行环境

在Hadoop伪分布式环境下运行,具体环境为:

wordcount example

  1. 在hdfs中新建input目录,将README.txt文件上传到input中
  1. hadoop fs -mkdir input
  2. hadoop fs -put /usr/hadoop-2.6.4/README.txt input/

Hadoop 运行程序时,输出目录不能存在,否则会提示错误 “org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://localhost:9000/user/hadoop/output
already exists”
,因此若要再次执行,需要执行命令删除 output 文件夹
2. 运行自带的wordcount example

  1. hadoop jar /usr/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount input/README.txt output/wordcount

YARN监控过程:
yarnword
输出结果为:
wordresult

PageRank

编写基于mapreduce的PageRank程序如下:

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.io.OutputStreamWriter;
  6. import java.net.URI;
  7. import java.util.ArrayList;
  8. import java.util.HashMap;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.fs.FSDataInputStream;
  11. import org.apache.hadoop.fs.FSDataOutputStream;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.io.LongWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  20. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  21. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  23. public class PageRank {
  24. public static class PageMapper extends Mapper<LongWritable, Text, Text, Text>{
  25. private Text averageValue = new Text();
  26. private Text node = new Text();
  27. @Override
  28. //把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数
  29. protected void map(LongWritable key, Text value,
  30. Context context)
  31. throws IOException, InterruptedException {
  32. String string = value.toString();
  33. String [] ss = string.split(",");
  34. int length = ss.length;
  35. double pageValue = Double.parseDouble(ss[1]);
  36. double average = pageValue/(length-2);
  37. averageValue.set(String.valueOf(average));
  38. int i = 2;
  39. while(i<=length-1){
  40. node.set(ss[i]);
  41. context.write(node,averageValue);
  42. i++;
  43. }
  44. }
  45. }
  46. public static class PageReducer extends Reducer<Text, Text, Text, Text>{
  47. private HashMap<String, String> content;
  48. private Text res = new Text();
  49. //reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值
  50. //利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出
  51. @Override
  52. protected void reduce(Text text, Iterable<Text> intIterable,
  53. Context context)
  54. throws IOException, InterruptedException {
  55. double sum = 0.0;
  56. double v = 0.0;
  57. for (Text t : intIterable) {
  58. v = Double.parseDouble(t.toString());
  59. sum = sum + v;
  60. }
  61. double a = 0.85;
  62. double result = (1-a)+ a*sum;
  63. String sRes = String.valueOf(result);
  64. String back = content.get(text.toString());
  65. String front = text.toString();
  66. String comp = front + "," + sRes + back;
  67. res.set(comp);
  68. context.write(null,res);
  69. }
  70. @Override
  71. //reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用
  72. //方便查询
  73. protected void setup(Context context)
  74. throws IOException, InterruptedException {
  75. URI[] uri = context.getCacheArchives();
  76. content = new HashMap<String, String>();
  77. for(URI u : uri)
  78. {
  79. FileSystem fileSystem = FileSystem.get(u.create("hdfs://master:9000"), context.getConfiguration());
  80. FSDataInputStream in = null;
  81. in = fileSystem.open(new Path(u.getPath()));
  82. BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
  83. String line;
  84. while((line = bufferedReader.readLine())!=null)
  85. {
  86. int index = line.indexOf(",");
  87. String first = line.substring(0,index);
  88. String last = line.substring(index,line.length());
  89. content.put(first, last);
  90. }
  91. }
  92. }
  93. }
  94. public static void main(String[] args) throws Exception{
  95. //接受路径文件
  96. Path inputPath = new Path(args[0]);
  97. Path outputPath = new Path(args[1]);
  98. Path cachePath = new Path(args[2]);
  99. double result = 100;
  100. int flag = 0;
  101. //制定差值多大时进入循环
  102. while(result>0.1)
  103. {
  104. if(flag == 1)
  105. {
  106. //初次调用mapreduce不操作这个
  107. //这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件
  108. copyFile();
  109. flag = 0;
  110. }
  111. Configuration configuration = new Configuration();
  112. Job job = Job.getInstance(configuration);
  113. job.setJarByClass(PageRank.class);
  114. job.setMapperClass(PageMapper.class);
  115. job.setReducerClass(PageReducer.class);
  116. job.setMapOutputKeyClass(Text.class);
  117. job.setMapOutputValueClass(Text.class);
  118. job.setOutputKeyClass(Text.class);
  119. job.setOutputValueClass(Text.class);
  120. job.setInputFormatClass(TextInputFormat.class);
  121. job.setOutputFormatClass(TextOutputFormat.class);
  122. FileInputFormat.addInputPath(job, inputPath);
  123. FileOutputFormat.setOutputPath(job, outputPath);
  124. job.addCacheArchive(cachePath.toUri());
  125. outputPath.getFileSystem(configuration).delete(outputPath, true);
  126. job.waitForCompletion(true);
  127. String outpathString = outputPath.toString()+"/part-r-00000";
  128. //计算两个文件的各节点的pagerank值差
  129. result = fileDo(inputPath, new Path(outpathString));
  130. flag = 1;
  131. }
  132. System.exit(0);
  133. }
  134. //计算两个文件的每个节点的pagerank差值,返回
  135. public static double fileDo(Path inputPath,Path outPath) throws Exception
  136. {
  137. Configuration conf = new Configuration();
  138. conf.set("fs.defaultFS", "hdfs://master:9000");
  139. FileSystem fs = FileSystem.get(conf);
  140. FSDataInputStream in1 = null;
  141. FSDataInputStream in2 = null;
  142. in1 = fs.open(inputPath);
  143. in2 = fs.open(outPath);
  144. BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
  145. BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));
  146. String s1 = null;
  147. String s2 = null;
  148. ArrayList<Double> arrayList1 = new ArrayList<Double>();
  149. ArrayList<Double> arrayList2 = new ArrayList<Double>();
  150. while ((s1 = br1.readLine()) != null)
  151. {
  152. String[] ss = s1.split(",");
  153. arrayList1.add(Double.parseDouble(ss[1]));
  154. }
  155. br1.close();
  156. while ((s2 = br2.readLine()) != null)
  157. {
  158. String[] ss = s2.split(",");
  159. arrayList2.add(Double.parseDouble(ss[1]));
  160. }
  161. double res = 0;
  162. for(int i = 0;i<arrayList1.size();i++)
  163. {
  164. res = res + Math.abs(arrayList1.get(i)-arrayList2.get(i));
  165. }
  166. return res;
  167. }
  168. //将输出文件复制到输入文件中
  169. public static void copyFile() throws Exception
  170. {
  171. Configuration conf = new Configuration();
  172. conf.set("fs.defaultFS", "hdfs://master:9000");
  173. FileSystem fs = FileSystem.get(conf);
  174. FSDataInputStream in1 = null;
  175. in1 = fs.open(new Path("output/part-r-00000"));
  176. BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
  177. //这里删除需要打开hdfs在/input目录下的权限操作,非常重要
  178. //“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件
  179. fs.delete(new Path("input/test2.txt"),true);
  180. //建立一个新文件,返回流
  181. FSDataOutputStream fsDataOutputStream = fs.create(new Path("input/test2.txt"));
  182. BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream));
  183. String s1 = null;
  184. //写出并写入
  185. while ((s1 = br1.readLine()) != null)
  186. {
  187. bw1.write(s1);
  188. bw1.write("\n");
  189. }
  190. bw1.close();
  191. fsDataOutputStream.close();
  192. br1.close();
  193. in1.close();
  194. }
  195. }

将 Hadoop 的 classhpath 信息添加到 CLASSPATH 变量中,在 ~/.bashrc 中增加如下几行:

export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH

执行 source ~/.bashrc 使变量生效,接着就可以通过 javac 命令编译

javac PageRank.java

接着把 .class 文件打包成 jar

jar -cvf PageRank.jar ./PageRank*.class

上传测试数据input.txt到hdfs:

A,B,C,D,E,F
B,C,D,E,F
C,D,E,F
D,E,F
E,F
F,A,B,C,D,E

执行程序:

 hadoop jar PageRank.jar PageRank input/input.txt output cache

yarnpage

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注