[关闭]
@zhengyuhong 2014-10-09T01:11:43.000000Z 字数 14123 阅读 1067

Hadoop技术内幕

Hadoop 读书笔记


深入解析Hadoop Common和HDFS架构设计与实现原理

第一章、源代码环境准备

1.1、什么是Hadoop

  Hadoop是Apache基金会下的一个开源分布式计算平台,以Hadoop分布式文件系统HDFS和分布式计算框架MapReduce为核心,为用户提供了底层细节透明的分布式基础设施。HDFS的高容错性、高伸缩性等优点允许用户将Hadoop部署在廉价的硬件上,构建分布式系统;MapReduce则允许用户在不了解分布式系统底层细节的情况下开发、并行应用程序。

Hadoop生态系统

其中,我个人主要学习研究的是Common、MapReduce与HDFS

Hadoop Common

  Common为Hadoop的其他项目提供了 一些常用工具,主要包括系统配置工具Configuration、远程调用RPF、序列号机制和Hadoop抽象文件系统FileSystem等。它们为在通用硬件上搭建云计算环境提供基础服务,并提供软件开发所需的API

HDFS

  HDFS是Hadoop体系中数据存储管理的基础。它是一个高度容错的系统,能检测与应对硬件故障,用于在低成本的通用硬件上运行。HDFS简化了文件的一致性模型,通过流式数据访问,提供高吞吐量应用程序数据访问功能,适合带有大型数据集的应用程序

MapReduce

  MapReduce是一种分布式计算框架,用以进行大数据量的计算。Common、HDFS与MapReduce是Hadoop发展初期的三个组件。MapReduce将应用划分为Map与Reduce两个步骤,其中Map对数据集上的独立元素进行指定的操作,生成键值对形式中间结果。Reduce则对中间结果中相同的“键”的所有“值”进行规约,以得到最终结果。MapReduce这样的功能划分,非常适合分布式计算。

第二章、Hadoop配置信息处理

2.1、Hadoop Configuration详解

Hadoop提供了一套独有的配置文件管理系统org.apache.hadoop.conf.Configuration,并提供自己的API

  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <!-- Put site-specific property overrides in this file. -->
  4. <configuration>
  5. <property>
  6. <name>fs.default.name</name>
  7. <value>hdfs://localhost:9000</value>
  8. </property>
  9. <property>
  10. <name>hadoop.tmp.dir</name>
  11. <value>/home/hadoop/hadoop-1.2.1/hadoop_tmp</value>
  12. <description>A base for other temporary directories.</description>
  13. </property>
  14. </configuration>

  Hadoop配置文件根元素是configuration,一般只包含子元素property,每一个property就是一个配置项,配置文件不支持分层或分级。每一个配置项一般包括配置属性的名称name、值value和一个关于配置项的描述description;元素final和Java的关键词final类似,表示配置项是“固定不可变的”。final一般不出现,但是在合并配置文件时就可以防止配置项被后来出现的值覆盖,如core-default.xmlcore-site.xml
  在Configuration中,每一个属性名称都是String类型,但是值类型可能是以下多种类型,包括Java中的基本类型,以及String、File与String[]等
  Hadoop配置系统还有一个很重要的功能,就是属性扩展。如配置项dfs.name.dir的值是$(hadoop.tmp.dir)/dfs/name,其中,$(hadoop.tmp.dir)会使用Configuration中相应的属性值进行扩展
  使用Configuration类的一般过程是:构造Configuration对象,并通过类的addResource方法添加需要加载的资源(如mapred-site.xml);然后使用get*set*来访问与设置配置项
  addResource有四个重载函数

  1. public void addResource(String name) {//载入CLASSPATH资源
  2. addResourceObject(name);
  3. public void addResource(URL url) {//载入URL资源,如www.example.com/core-site.xml
  4. addResourceObject(url);
  5. }
  6. public void addResource(Path file) {//载入HDFS资源
  7. addResourceObject(file);
  8. }
  9. public void addResource(InputStream in) {//载入输入流中的资源
  10. addResourceObject(in);
  11. }

2.2、Configuration成员变量

  1. private boolean loadDefaults = true;
  2. private static final CopyOnWriteArrayList<String> defaultResources =
  3. new CopyOnWriteArrayList<String>();

  布尔变量loadDefaults用于确定是否加载默认资源,这些默认资源保存在defaultResources中。在HDFS中,会把hdfs-default.xml和hdfs-site.xml作为默认资源,并通过addDefaultResource保存在成员变量defaultResources;在MapReduce中,默认资源是mapre-default.xml和mapred-site.xml,如在HDFS的DataNode中就有如下代码:

  1. static{
  2. Configuration.addDefaultResource("mapred-default.xml");
  3. Configuration.addDefaultResource("mapred-site.xml");
  4. }

Properties是java.util.Properties类,用于处理简单的配置文件,简单来说,Properties对象存放配置项的键值对。

  1. private Set<String> finalParameters = new HashSet<String>();
  2. private Properties properties;
  3. private Properties overlay;

properties存放Hadoop配置文件解析后的键值对
finalParameters用于保存所有在配置文件中被声明为final的配置项键值对
overlay用于记录通过set*()方法改变的配置项,也就是说overlay中的键值对应该是通过设置的,而不是通过对配置资源解析得到的。

2.3、资源加载

  资源通过对象的addResource()方法或者类的静态方法addDefaultResource()添加到Configuration对象中,添加的资源并不会立即被加载,只是通过reloadConfiguration()方法清空了propertiesfinalParameters

  1. private ArrayList<Object> resources = new ArrayList<Object>();
  2. public void addResource(String name) {
  3. addResourceObject(name);
  4. }
  5. private synchronized void addResourceObject(Object resource) {
  6. resources.add(resource); // add to resources
  7. reloadConfiguration();
  8. }
  9. public synchronized void reloadConfiguration() {
  10. properties = null; // trigger reload
  11. finalParameters.clear(); // clear site-limits
  12. }

  静态方法addDefaultResource()也能清空Configuration对象中的数据(非静态成员),这是通过类的静态成员REGISTRY作为媒介进行的。

  1. private static final WeakHashMap<Configuration,Object> REGISTRY =
  2. new WeakHashMap<Configuration,Object>();

  REGISTRY记录了系统中所有Configuration对象,所有addDefaultResource()被调用时,遍历REGISTRY中的元素并在元素上调用reloadConfiguration()即可触发资源的重新加载。相关代码如下:

  1. public static synchronized void addDefaultResource(String name) {
  2. if(!defaultResources.contains(name)) {
  3. defaultResources.add(name);
  4. for(Configuration conf : REGISTRY.keySet()) {
  5. if(conf.loadDefaults) {
  6. conf.reloadConfiguration();
  7. }
  8. }
  9. }
  10. }

  成员变量properyies中的数据指导需要的时候才会加载进来。在getProps()方法中如果发现properties为空,将触发loadResources()加载配置资源,采用了延迟加载设计模型,真正需要数据时菜开始分析配置文件。

  1. private synchronized Properties getProps() {
  2. if (properties == null) {
  3. properties = new Properties();
  4. loadResources(properties, resources, quietmode);
  5. if (overlay!= null) {
  6. properties.putAll(overlay);
  7. for (Map.Entry<Object,Object> item: overlay.entrySet()) {
  8. updatingResource.put((String) item.getKey(), UNKNOWN_RESOURCE);
  9. }
  10. }
  11. }
  12. return properties;
  13. }
  14. private void loadResources(Properties properties,
  15. ArrayList resources,
  16. boolean quiet) {
  17. if(loadDefaults) {
  18. for (String resource : defaultResources) {
  19. loadResource(properties, resource, quiet);
  20. }
  21. //support the hadoop-site.xml as a deprecated case
  22. if(getResource("hadoop-site.xml")!=null) {
  23. loadResource(properties, "hadoop-site.xml", quiet);
  24. }
  25. }
  26. for (Object resource : resources) {
  27. loadResource(properties, resource, quiet);
  28. }
  29. }

2.4、使用get*set*访问/设置配置项

  get*方法根据配置项的键获取对应的值,如果键不存在,则返回默认值defaultValue

  1. public String get(String name) {
  2. return substituteVars(getProps().getProperty(name));
  3. }

  Configuration.get()会调用私有方法substituteVars(),该方法完全完成配置的属性扩展,属性扩展是指配置项的值包含$(key)这种格式的变量,这些变量会被自动递归替换成相应的值,也就是说$(key)后被替换成以key为键的配置项的值,注意,如果$(key)中替换后配置项依然有变量,过程会递归下去。会有最大次数防止死循环。
  set*最终都是调用了下面的Configuration.set()方法

  1. /**
  2. * Stores the mapping of key to the resource which modifies or loads
  3. * the key most recently
  4. */
  5. private HashMap<String, String> updatingResource;
  6. public void set(String name, String value) {
  7. getOverlay().setProperty(name, value);
  8. getProps().setProperty(name, value);
  9. this.updatingResource.put(name, UNKNOWN_RESOURCE);
  10. }

2.5、Configurable接口

  Configurable是一个很简单的接口,也位于org.apache.hadoop.conf包中。如果一个类实现了这个接口,意味这个类可以配置,也就是说,可以通过为这个类的对象传入一个Configuration实例,提供对象工作需要的一些配置信息。Hadoop的代码中有大量的类实现了Configurable接口,如org.apache.hadoop.mapred.SequenceFilInputFilter.RegexFilter,RegexFilter对象工作时需要提供正则表达式,用于过滤读取的记录,由于RegexFilter的父类Filter中实现了Configurable接口,RegexFilter可以在它的setConf()方法中,使用Configuration.get()方法获取以字符串传入的正则表达式,并初始化成员变量p,相关代码如下:

  1. private Pattern p;
  2. public void setConf(Configuration conf) {
  3. String regex = conf.get(FILTER_REGEX);
  4. if (regex==null)
  5. throw new RuntimeException(FILTER_REGEX + "not set");
  6. this.p = Pattern.compile(regex);
  7. this.conf = conf;
  8. }

第三章、序列化与压缩

  对象序列化用于将对象编码成为一个字节流,反序列化用于从字节流中重新构造对象。
  
  序列化有三个主要用途:

3.1、Java内建序列化机制

  在Java中,使一个类的对象可以被序列化非常简单,只需要在类声明时加入Serializable接口即可。Serializable接口时一个标志,不具有任何成员函数,其定义如下:

  1. public interface Serializable{
  2. }

Serializable 接口 没有任何方法, 所以不需要对类进行修改, Block 类通过声明它实现了Serializable 接口 , 立即可以获得 Java 提供的序列化功能。

  1. public class Block implements Writable, Comparable<Block>, Serializable

  由于序列化主要应用在与 I/O 相关的一些操作上, 其实现是通过一对输入 / 输出流来实现的。 如果想对某个对象执行序列化动作, 可以在某种 OutputStream 对象( 后面还会讨论Java 的流) 的基础上创建一个对象流 ObjectOutputStream 对象, 然后调用writeObject()就可达到目的。
writeObject() 方法负责写入实现了Serializable接口对象的状态信息,输出数据将被送至该OutputStream。多个对象的序列化可以在ObjectOutputStream对象上多次调用writeObject(),分别写入这些对象。 下面是序列化一个 Block 对象的例子:

  1. Block block1=new Block(7806259420524417791L, 39447755L, 56736651L);
  2. Block block2=new Block(5547099594945187683L, 67108864L, 56736828L);
  3. ……
  4. ByteArrayOutputStream out=new ByteArrayOutputStream();
  5. // 在 ByteArrayOutputStream 的基础上创建 ObjectOutputStream
  6. ObjectOutputStream objOut=new ObjectOutputStream(out);
  7. // 对 block 进行序列化
  8. objOut.writeObject(block1);

  对于Java基本类型的序列化,ObjectOutputStream提供了writeBoolean()writeByte()等方法。输入过程类似, 将InputStream包装在ObjectInputStream中并调用readObject(),该方法返回一个指向向上转型后的 Object 的引用,通过向下转型,就可以得到正确结果。 读取对象时, 必须要小心地跟踪存储的对象的数量、 顺序以及它们的类型。
  序列化的结果中 包含了 大量与类相关的信息

3.2、Hadoop 序列化机制

  和 Java 序列化机制不同(在对象流 ObjectOutputStream对象上调用writeObject() 方法),Hadoop的序列化机制通过调用对象的 write() 方法( 它带有一个类型为DataOutput的参数), 将对象序列化到流中。反序列化的过程也是类似, 通过对象的readFields(), 从流中读取数据。 值得一提的是, Java 序列化机制中,反序列化过程会不断地创建新的对象, 但在Hadoop 的序列化机制的反序列化过程中, 用户可以复用对象: 如, 在 Block 的某个对象上反复调用 readFields(),可以在同一个对象上得到多个反序列化的结果, 而不是多个反序列化的结果对象( 对象被复用了), 这减少了 Java 对象的分配和回收, 提高了应用的效率。

  1. public static void main(String[] args) {
  2. try {
  3. Block block1=new Block(7806259420524417791L, 39447755L, 56736651L);
  4. ……
  5. ByteArrayOutputStream bout = new ByteArrayOutputStream();
  6. DataOutputStream dout=new DataOutputStream(bout);
  7. block1.write(dout); // 序列化对象到输出流 dout 中
  8. dout.close();
  9. System.out.println(……);
  10. SerializationExample.print16(out.toByteArray(), bout.size());
  11. }
  12. ……
  13. }

  由于Block对象序列化时只输出了3个长整数, block1的序列化结果一共有 24 字节, 如下所示。 和 Java 的序列化机制的输出结果对比, Hadoop的序列化结果紧凑而且快速。

3.3、Hadoop序列化机制的特征

对于处理大规模数据的 Hadoop 平台, 其序列化机制需要具有如下特征:

3.4、Hadoop Writable 机制

为了支持以上这些特性,Hadoop引入org.apache.hadoop.io.Writable接口 , 作为所有可序列化对象必须实现的接口

  1. public interface Writable{
  2. public void readFields(DataInput in);
  3. //Deserialize the fields of this object from in.
  4. public void write(DataOutput out);
  5. //Serialize the fields of this object to out.
  6. }

  Writable.write() 方法用 于将对象状态写入二进制的 DataOutput 中, 反序列化的过程由readFields()DataInput 流中读取状态完成。 下面是一个例子:

  1. public class Block implements Writable, Comparable<Block>, Serializable {
  2. private long blockId;
  3. private long numBytes;
  4. private long generationStamp;
  5. public void write(DataOutput out) throws IOException {
  6. out.writeLong(blockId);
  7. out.writeLong(numBytes);
  8. out.writeLong(generationStamp);
  9. }
  10. public void readFields(DataInput in) throws IOException {
  11. this.blockId = in.readLong();
  12. this.numBytes = in.readLong();
  13. this.generationStamp = in.readLong();
  14. if (numBytes < 0) {
  15. throw new IOException("Unexpected block size: " + numBytes);
  16. }
  17. }
  18. }

  这个例子使用的是前面分析 Java 序列化机制的 Block 类, Block 实现了 Writable 接口 ,即需要实现 write() 方法和 readFields() 方法, 这两个方法的实现都很简单 : Block 有三个成员 变量, write() 方法简单地把这三个变量写入流中, 而 readFields() 则从流中 依次读入这些数据, 并做必要的检查。
  Hadoop 序列化机制中还包括另外几个重要接口 : WritableComparableRawComparatorWritableComparator
  WritableComparable, 顾名思义, 它提供类型比较的能力,这对MapReduce至关重要。该接口继承自Writable接口和 Comparable接口,其中Comparable用于进行类型比较。ByteWritableIntWritableDoubleWritable 等 Java 基本类型对应的 Writable 类型, 都继承自WritableComparable
  效率在Hadoop中非常重要,因此 HadoopI/O 包中提供了具有高效比较能力的 RawComparator 接口 。RawComparator 接口允许执行者比较流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。 其中, compare() 比较时需要的两个参数所对应的记录位于字节数组 b1 和 b2 的指定开始位置 s1 和 s1, 记录长度为 l1 和 l2, 代码如下:

  1. public interface RawComparator<T> extends Comparator<T> {
  2. public int compare(byte[] b1, int s1, int l1,
  3. byte[] b2, int s2, int l2);
  4. }

IntWritable 为例,它的RawComparator实现中( WritableComparator 是一个辅助类, 实现了RawComparator接口),compare() 方法通过 readInt() 直接在字节数组中 读入需要比较的两个整数, 然后输出 Comparable 接口 要求的比较结果。 值得注意的是, 该过程中compare() 方法避免使用 IntWritable 对象, 从而避免了不必要的对象分配。 相关代码如下:

  1. public static class Comparator extends WritableComparator {
  2. public int compare(byte[] b1, int s1, int l1,
  3. byte[] b2, int s2, int l2) {
  4. int thisValue = readInt(b1, s1);
  5. int thatValue = readInt(b2, s2);
  6. return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  7. }
  8. }

WritableComparatorRawComparatorWritableComparable 类的一个通用实现。 它提供两个主要功能。首先,提供了一个RawComparator的compare()默认实现, 该实现从数据流中 反序列化要进行比较的对象,然后调用对象的 compare() 方法进行比较( 这些对象都是Comparable 的)。 其次, 它充当了 RawComparator实例的一个工厂方法, 例如, 可以通过如下代码获得 IntWritableRawComparator

  1. RawComparator<IntWritable>comparator =
  2. WritableComparator.get(IntWritable.class);

3.5、典型的Writable类详解

  目前Java基本类型对应的 Writable 封装如表 3-1 所示。 所有这些Writable 类都继承自WritableComparable。 也就是说,它们是可比较的。 同时,它们都有 get()set() 方法, 用于获得和设置封装的值。
  对整型( intlong) 进行编码的时候, 有固 定长度格式( IntWritable 和LongWritable)和可变长度格式(VIntWritableVLongWritable) 两种选择。 固定长度格式的整型, 序列化后的数据是定长的,而可变长度格式则使用一种比较灵活的编码方式, 对
于数值比较小的整型, 它们往往比较节省空间。 同时, 由 于 VIntWritableVLongWritable的编码规则是一样的, 所以VIntWritable 的输出可以 用VLongWritable读入。下面以VIntWritable 为例, 说明 Writable 的 Java 基本类封装实现。 代码如下:

  1. public class VIntWritable implements WritableComparable {
  2. private int value;
  3. // 设置 VIntWritable 的值
  4. public void set(int value) {
  5. this.value = value;
  6. }
  7. // 获取 VIntWritable 的值
  8. public int get() {
  9. return value;
  10. }
  11. public void readFields(DataInput in) throws IOException {
  12. value = WritableUtils.readVInt(in);
  13. }
  14. public void write(DataOutput out) throws IOException {
  15. WritableUtils.writeVInt(out, value);
  16. }
  17. }

首先,每个Java基本类型的Writable 封装, 其类的内部都包含一个对应基本类型 的Java 的基本类型 shortchar 并没有对应的 Writable 类, 它们可以保持在 IntWritable 中。而 Writable
接口要求的readFields()write() 方 法, VIntWritable 则是通 过调用Writable 工具类中提供的readVInt()writeVInt() 读 / 写数据。方法readVInt()writeVInt() 的实现也只是简单调用了readVLong()writeVLong(), 所以,通过writeVInt() 写的数据自然可以通过readVLong() 读入。

3.6、ObjectWritable类的实现

  ObjectWritablewrite 方法调用 的是静态方法ObjectWritable.writeObject(), 该方法可以往 DataOutput 接口 中写入各种 Java 对象。

  1. public class ObjectWritable implements Writable, Configurable {
  2. private Class declaredClass;
  3. private Object instance;
  4. private Configuration conf;
  5. public void readFields(DataInput in) throws IOException {
  6. readObject(in, this, this.conf);
  7. }
  8. public void write(DataOutput out) throws IOException {
  9. writeObject(out, instance, declaredClass, conf);
  10. }
  11. public void write(DataOutput out) throws IOException {
  12. UTF8.writeString(out, declaredClass.getName());
  13. }
  14. public static void writeObject(DataOutput out, Object instance,
  15. Class declaredClass,
  16. Configuration conf) throws IOException ;
  17. public static Object readObject(DataInput in, Configuration conf)
  18. throws IOException {
  19. return readObject(in, null, conf);
  20. }
  21. public static Object readObject(DataInput in,
  22. ObjectWritable objectWritable,
  23. Configuration conf) throws IOException;
  24. }

  writeObject() 方法先输出 对象的类名( 通过对象对应的 Class 对象的 getName() 方法获得), 然后根据传入对象的类型,分情况系列化对象到输出流中, 也就是说, 对象通过该方法输出对象的类名, 对象序列化结果对到输出流中。 在 ObjectWritable.writeObject() 的逻辑中, 需要分别处理 null、 Java 数组、 字符串 String、 Java 基本类型、 枚举和 Writable 的子类6 种情况, 由于类的继承, 处理 Writable 时, 序列化的结果包含对象类名, 对象实际类名和对象序列化结果三部分。
  为什么需要对象实际类名呢? 根据Java的单根继承规则, ObjectWritable 中传入的declaredClass, 可以是传入 instance 对象对应的类的类对象, 也可以是 instance 对象的父类的类对象。 但是, 在序列化和反序列化的时候, 往往不能使用父类的序列化方法( 如 write
方法) 来序列化子类对象, 所以, 在序列化结果中必须记住对象实际类名。
  和输出对应, ObjectWritablereadFields() 方法调用的是静态方法 ObjectWritable.readObject (), 该方法的实现和 writeObject() 类似。

3.6、小结

  Writable 接口通过 write()readFields() 方法声明了序列化和反序列化的功能。 在此基础上,分析了Writable 的一些典型子类的实现, 包括 Java 基本类型对应的 Writable 封装和 ObjectWritable,它们为用户使用 Hadoop 提供了很多方便。

第四章、Hadoop远程过程调用(Remote Procedure Call,RPC)

4.1RPC原理

  RPC就是允许程序调用位于其他机器上的进程。本地机器client需要调用一个远程进程的时候,通过把参数打包成为一个消息,并附加被调用进程的名字(绝对名称,包括服务器IP地址、端口和进程名称),然后发送消息到服务器,本地机器发送完消息之后,必须等待服务器的应答,这个时候,执行流是空闲的。
  RPC的远程机器server运行时会阻塞在接收消息的调用上,当接到client的请求时,它会解包以获取请求参数,这类似传统进程调用,被调用函数从栈中接收参数,然后确定调用进程的名字并调用相应进程。调用结束后,返回值通过主程序打包发挥给client,通过client调用结束。

4.2、Java NIO

Java基本套接字
  socket是两台主机间的一个链接,可以进行7项基本操作:

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注