@gekeshi
2016-04-30T14:54:33.000000Z
字数 6511
阅读 375
Hadoop
在Hadoop伪分布式环境下运行,具体环境为:
hadoop fs -mkdir inputhadoop fs -put /usr/hadoop-2.6.4/README.txt input/
Hadoop 运行程序时,输出目录不能存在,否则会提示错误 “org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://localhost:9000/user/hadoop/output ,因此若要再次执行,需要执行命令删除 output 文件夹
already exists”
2. 运行自带的wordcount example
hadoop jar /usr/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount input/README.txt output/wordcount
YARN监控过程:
输出结果为:

编写基于mapreduce的PageRank程序如下:
import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class PageRank {public static class PageMapper extends Mapper<LongWritable, Text, Text, Text>{private Text averageValue = new Text();private Text node = new Text();@Override//把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String string = value.toString();String [] ss = string.split(",");int length = ss.length;double pageValue = Double.parseDouble(ss[1]);double average = pageValue/(length-2);averageValue.set(String.valueOf(average));int i = 2;while(i<=length-1){node.set(ss[i]);context.write(node,averageValue);i++;}}}public static class PageReducer extends Reducer<Text, Text, Text, Text>{private HashMap<String, String> content;private Text res = new Text();//reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值//利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出@Overrideprotected void reduce(Text text, Iterable<Text> intIterable,Context context)throws IOException, InterruptedException {double sum = 0.0;double v = 0.0;for (Text t : intIterable) {v = Double.parseDouble(t.toString());sum = sum + v;}double a = 0.85;double result = (1-a)+ a*sum;String sRes = String.valueOf(result);String back = content.get(text.toString());String front = text.toString();String comp = front + "," + sRes + back;res.set(comp);context.write(null,res);}@Override//reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用//方便查询protected void setup(Context context)throws IOException, InterruptedException {URI[] uri = context.getCacheArchives();content = new HashMap<String, String>();for(URI u : uri){FileSystem fileSystem = FileSystem.get(u.create("hdfs://master:9000"), context.getConfiguration());FSDataInputStream in = null;in = fileSystem.open(new Path(u.getPath()));BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));String line;while((line = bufferedReader.readLine())!=null){int index = line.indexOf(",");String first = line.substring(0,index);String last = line.substring(index,line.length());content.put(first, last);}}}}public static void main(String[] args) throws Exception{//接受路径文件Path inputPath = new Path(args[0]);Path outputPath = new Path(args[1]);Path cachePath = new Path(args[2]);double result = 100;int flag = 0;//制定差值多大时进入循环while(result>0.1){if(flag == 1){//初次调用mapreduce不操作这个//这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件copyFile();flag = 0;}Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(PageRank.class);job.setMapperClass(PageMapper.class);job.setReducerClass(PageReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);job.addCacheArchive(cachePath.toUri());outputPath.getFileSystem(configuration).delete(outputPath, true);job.waitForCompletion(true);String outpathString = outputPath.toString()+"/part-r-00000";//计算两个文件的各节点的pagerank值差result = fileDo(inputPath, new Path(outpathString));flag = 1;}System.exit(0);}//计算两个文件的每个节点的pagerank差值,返回public static double fileDo(Path inputPath,Path outPath) throws Exception{Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://master:9000");FileSystem fs = FileSystem.get(conf);FSDataInputStream in1 = null;FSDataInputStream in2 = null;in1 = fs.open(inputPath);in2 = fs.open(outPath);BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));String s1 = null;String s2 = null;ArrayList<Double> arrayList1 = new ArrayList<Double>();ArrayList<Double> arrayList2 = new ArrayList<Double>();while ((s1 = br1.readLine()) != null){String[] ss = s1.split(",");arrayList1.add(Double.parseDouble(ss[1]));}br1.close();while ((s2 = br2.readLine()) != null){String[] ss = s2.split(",");arrayList2.add(Double.parseDouble(ss[1]));}double res = 0;for(int i = 0;i<arrayList1.size();i++){res = res + Math.abs(arrayList1.get(i)-arrayList2.get(i));}return res;}//将输出文件复制到输入文件中public static void copyFile() throws Exception{Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://master:9000");FileSystem fs = FileSystem.get(conf);FSDataInputStream in1 = null;in1 = fs.open(new Path("output/part-r-00000"));BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));//这里删除需要打开hdfs在/input目录下的权限操作,非常重要//“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件fs.delete(new Path("input/test2.txt"),true);//建立一个新文件,返回流FSDataOutputStream fsDataOutputStream = fs.create(new Path("input/test2.txt"));BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream));String s1 = null;//写出并写入while ((s1 = br1.readLine()) != null){bw1.write(s1);bw1.write("\n");}bw1.close();fsDataOutputStream.close();br1.close();in1.close();}}
将 Hadoop 的 classhpath 信息添加到 CLASSPATH 变量中,在 ~/.bashrc 中增加如下几行:
export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH
执行 source ~/.bashrc 使变量生效,接着就可以通过 javac 命令编译
javac PageRank.java
接着把 .class 文件打包成 jar
jar -cvf PageRank.jar ./PageRank*.class
上传测试数据input.txt到hdfs:
A,B,C,D,E,F
B,C,D,E,F
C,D,E,F
D,E,F
E,F
F,A,B,C,D,E
执行程序:
hadoop jar PageRank.jar PageRank input/input.txt output cache
