[关闭]
@Arslan6and6 2016-08-29T02:28:40.000000Z 字数 12954 阅读 614

【作业二十四】殷杰

第十四章、分布式数据库HBase基本使用

---HBase 架构与使用

作业描述:
大数据(Hadoop)数据库HBase 架构、数据读写流程、Java API 使用、与MapReduce 集成
及数据导入,属于实际项目中必须要掌握的基本知识,以下几点大家要注重:
1) HBase 架构图,深入浅出理解,结合HDFS 目录和Zookeeper 中Znode;
2) HBase 中元数据表hbase:meta功能,在企业中遇到问题如何解决
3) HBase Java API基本使用,包含PUT、GET、SCAN及DELETE
4) HBase与MapReduce集成时,Classpath注意及如何从HBase 表中读取数据和写入数据,练习MapReduce 编程
5) 企业中常见的HBase数据迁移的几种方式、importtsv工具使用及如何编写MapReduce
将日志数据转换为 HFile,再通过bulk load加载数据到HBase表中,实现快速的数据导入。

HBase架构深入总结

image_1al500erc198h1bubvohrtb1o3e9.png-14.8kB

image_1al5015l8onbg371sbo1qdkb1km.png-12.9kB

HBase 中元数据表hbase:meta功能

HBase Java API基本使用,包含PUT、GET、SCAN及DELETE

环境准备

1.新建 Maven Project ,修改 pom.xml 文件,内容如下:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>org.apache.hbase</groupId>
  5. <artifactId>beifeng-hbase</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>beifeng-hbase</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <hadoop.version>2.5.0</hadoop.version>
  13. <hive.version>0.13.1</hive.version>
  14. <hbase.version>0.98.6-hadoop2</hbase.version>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>junit</groupId>
  19. <artifactId>junit</artifactId>
  20. <version>4.10</version>
  21. <scope>test</scope>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.hadoop</groupId>
  25. <artifactId>hadoop-client</artifactId>
  26. <version>2.5.0</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.hbase</groupId>
  30. <artifactId>hbase-server</artifactId>
  31. <version>${hbase.version}</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.hbase</groupId>
  35. <artifactId>hbase-client</artifactId>
  36. <version>${hbase.version}</version>
  37. </dependency>
  38. </dependencies>
  39. </project>

2.在 Maven Project 下新建资源文件夹 /src/main/resource 。并导入 hdfs-site.xml core-site.xml hbase-site.xml log4j.properties 4个文件

  1. cp /opt/modules/hadoop-2.5.0-cdh5.3.6/etc/hadoop/hdfs-site.xml /home/beifeng/workspace/beifeng-hbase/src/main/resource/
  2. cp /opt/modules/hadoop-2.5.0-cdh5.3.6/etc/hadoop/core-site.xml /home/beifeng/workspace/beifeng-hbase/src/main/resource/
  3. cp /opt/modules/hbase-0.98.6-hadoop2/conf/hbase-site.xml /home/beifeng/workspace/beifeng-hbase/src/main/resource/
  4. cp /home/beifeng/workspace/studyhdfs/src/main/resource/log4j.properties /home/beifeng/workspace/beifeng-hbase/src/main/resource/

3.将准备好的 repository 文件夹复制到 /home/beifeng/.m2 下,内有准备好的 hbase-client 包
4.更新 Maven Project

java中操作HBase

Configuration类

HTable

在HBase中,HTable封装表格对象。

使用Get获取数据

首先导入import org.apache.hadoop.hbase.client.Get;
其次获取行键为某值的表格信息。语法格式如下:
Get get=new Get(rowkey的字节数组);
Get get = new Get(Bytes.toBytes("10001"));

插入、修改数据

在HBase中,实体的修改也是通过Put操作来实现。操作步骤如下:
① 导入Put类import org.apache.hadoop.hbase.client.Put;
② 新建Put对象。语法格式:Put put=new Put(行键对应的字节数组),例:
Put put = new Put(Bytes.toBytes("10003"));

③ 将需要修改的列信息加入put对象。语法格式:put.add(“列族”,”列名”,”
列值”),例:
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("zhaoliu"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("50"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("male"));
注意:一段代码中可以添加多个列的信息。

利用scan查询数据,在HBase中,利用scan来实现获取多条数据

  1. 实现全表扫描,步骤如下:
    ① 导入IOUtils类:import org.apache.hadoop.io.IOUtils;
    导入Scan类:import org.apache.hadoop.hbase.client.Scan;
    导入ResultScanner :
    import org.apache.hadoop.hbase.client.ResultScanner;
    ② 新建Scan对象。语法格式:Scan scan=new Scan();
    ③ 扫描表格得到结果集ResultScanner迭代器接口对象。语法格式:
    ResultScanner result=表格对象.getScanner(Scan对象);例题如下:
    ResultScanner result = table.getScanner(scan);
    ④ 遍历结果,ResultScanner是由查询的结果Result对象组成。
  1. for (Result res : result) {
  2. Cell[] ress = res.rawCells();
  3. for (Cell cell : ress) {
  4. System.out.print(Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
  5. System.out.print(Bytes.toString(CellUtil.cloneFamily(cell)) + ":");
  6. System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell)) + "==>");
  7. System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
  8. }

④ 关闭资源、表格。在之前的例题中,我们用 table.close() 方法关闭。本例题中我们系统提供的IOUtils类的closeStream()方法来关闭表格及其它资源。语法格式:
IOUtils.closeStream(对象);
IOUtils.closeStream(result);
IOUtils.closeStream(table);

  1. 利用Scan有条件的查询
    ① 利用行键Rowkey设置开始值和结束值查询。使用到以下方法setStartRow()与setStopRow( )方法。
    在scan对象新建后加入设置开始值和结束值的方法,
    格式:
    Scan对象. setStartRow(行键开始值);
    Scan对象. setStopRow(行键结束值);
    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes("10002")) ;
    scan.setStopRow(Bytes.toBytes("10003")) ;
    代码简化:
    Scan scan = new Scan(Bytes.toBytes("10002"), Bytes.toBytes("10003"));

② 利用列族筛选。方法如下:利用Scan对象的addFamily()方法通过列族筛选。
Scan scan = new Scan(Bytes.toBytes("10002"), Bytes.toBytes("10003"));
scan.addfamily(Bytes.toBytes("info"));
result = table.getScanner(scan);

③ 利用列筛选。方法如下:利用Scan对象的addColumn(”列族”,“列名”)方
法通过列族筛选。例:
Scan scan = new Scan(Bytes.toBytes("10002"), Bytes.toBytes("10003"));
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"))
result = table.getScanner(scan);

删除数据

以下删除操作是按rowkey行键来做删除,步骤如下:
① 导入import org.apache.hadoop.hbase.client.Delete;
② 新建Delete对象。语法格式:Delete delete=new Delete(行键对应的字节
数组)
Delete delete = new Delete("10003");
③ 列对象中指明要删除的列信息。常用方法如下:
delete.deleteColumn(“列族”,”列名”)删除某一列
delete.deleteFamily(“列族”)删除整个列族
④ 从表格中删除信息。语法格式:table.delete(delete对象);
⑤ 关闭表格。语法格式:table.close();

HBase与MapReduce集成时,Classpath注意及如何从HBase 表中读取数据和写入数据,练习MapReduce 编程

HBase与MapReduce集成时是需要jar包的,加载步骤如下:
1.可以通过bin/hbase maperduce命令查看。如图所示,为集成需要的jar包。

  1. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase mapredcp
  2. 2016-06-14 16:31:25,275 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-common-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/protobuf-java-2.5.0.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-client-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-protocol-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/high-scale-lib-1.1.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/zookeeper-3.4.5.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/guava-12.0.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/htrace-core-2.04.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/netty-3.6.6.Final.jar

2.设置HBase、Hadoop环境变量

  1. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ export HADOOP_HOME=/opt/modules/hadoop-2.5.0-cdh5.3.6
  2. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2

3.设置Hadoop_classpath环境变量

  1. beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ export HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp`
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/opt/modules/hbase-0.98.6-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. 2016-06-14 16:48:54,685 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

4.查看jar包可以运行的参数
[beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ /opt/modules/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

① cellcounter:统计HBASE表中有多少个cells的个数
② rowcounter:统计hbase中有多少条数据
[beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ /opt/modules/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar rowcounter user

image_1al743d2r1ca215vo14u11cpb1pov9.png-40.2kB

③ importtsv :导入TSV(以table键分割的文件)格式的文件
例如:
将内容如下,名称为 in.tsv 的文件上传至 HDFS 根目录下的 input 目录
image_1al7cu6a24641rd71j2t1he446v13.png-9.4kB
/opt/modules/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:tel user /input/in.tsv
image_1al7e47ge1ddg1cv61a9t1nh11r9a1g.png-78kB

④completebulkload:
tsv --> Hfile(HDFS)
将存储于 HDFS 的 tsv 文件直接转换为 Hfile 格式文件传输到 HDFS ,而不必经过 memstore 、flush 等过程,从而效率更高
举例,修改 in.tsv 文件的行键后重新上传至 HDFS ,执行如下命令:
/opt/modules/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar importtsv -Dimporttsv.bulk.output=/hfileoutput/ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:tel user /input/in.tsv

随后,将 Hfile 文件加载到 HBase 表,
Hfile(HDFS)--> table 命令如下:
/opt/modules/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar completebulkload /hfileoutput/ user
image_1al7h8v661i6p1ima1n3q15l41jov1t.png-136.4kB

自定义MapReduce HBase
table1 --> HBase table2

  1. package com.beifeng.senior.hadoop.beifenghbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.hbase.Cell;
  6. import org.apache.hadoop.hbase.CellUtil;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Result;
  10. import org.apache.hadoop.hbase.client.Scan;
  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.NullWritable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. public class User2StudentMapReduce extends Configured implements Tool{
  21. // step 1: Mapper
  22. public static class ReadUserMapper //
  23. extends TableMapper<ImmutableBytesWritable, Put>{
  24. @Override
  25. protected void map(ImmutableBytesWritable key, Result value,
  26. Context context)
  27. throws IOException, InterruptedException {
  28. // user: name & age -> student: name & age : put
  29. // create Put
  30. Put put = new Put(key.get()) ;
  31. // add column
  32. for(Cell cell: value.rawCells()){
  33. // add family: info
  34. if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
  35. // add column: name
  36. if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  37. put.add(cell) ;
  38. // CellUtil.cloneValue(cell)
  39. // put.add(family, qualifier, value) ;
  40. }
  41. // add column: age
  42. else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  43. put.add(cell) ;
  44. }
  45. }
  46. }
  47. // context output
  48. context.write(key, put);
  49. }
  50. }
  51. // step 2: Reducer
  52. public static class WriteStudentReducer //
  53. extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
  54. @Override
  55. protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
  56. Context context) throws IOException, InterruptedException {
  57. for(Put put : values){
  58. context.write(NullWritable.get(), put);
  59. }
  60. }
  61. }
  62. // step 3: Driver
  63. public int run(String[] args) throws Exception {
  64. // 1) Configuration
  65. Configuration conf = this.getConf();
  66. // 2) create job
  67. Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
  68. job.setJarByClass(User2StudentMapReduce.class);
  69. // 3) set job
  70. // input -> mapper -> shuffle -> reducer -> output
  71. Scan scan = new Scan() ;
  72. scan.setCacheBlocks(false);
  73. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  74. TableMapReduceUtil.initTableMapperJob(
  75. "user", // input table
  76. scan, // Scan instance to control CF and attribute selection
  77. ReadUserMapper.class, // mapper class
  78. ImmutableBytesWritable.class, // mapper output key
  79. Put.class, // mapper output value
  80. job //
  81. );
  82. TableMapReduceUtil.initTableReducerJob(
  83. "student", // output table
  84. WriteStudentReducer.class, // reducer class
  85. job //
  86. );
  87. job.setNumReduceTasks(1); // at least one, adjust as required
  88. boolean isSuccess = job.waitForCompletion(true);
  89. if (!isSuccess) {
  90. throw new IOException("error with job!");
  91. }
  92. return isSuccess ? 0 : 1;
  93. }
  94. public static void main(String[] args) throws Exception {
  95. Configuration conf = HBaseConfiguration.create();
  96. int status = ToolRunner.run(//
  97. conf, //
  98. new User2StudentMapReduce(), //
  99. args //
  100. );
  101. System.exit(status);
  102. }
  103. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注