@hadoopMan
2017-03-04T14:29:55.000000Z
字数 16461
阅读 1453
hbase
想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开
hbase是Hadoop 数据库,因为它具有存储数据和检索数据的功能。而且数据存储是基于HDFS,主要原因依赖hdfs存储数据量大,数据安全性高,而且硬件平台要求低,只需要商用PCServer就可以等特性。与RDBMS 相比hbase可以存储海量数据,而且检索速度可以达到准实时,最迟是秒级别。
打架hbase集群至少需要三个框架Hbase,zookeeper,HDFS。三个环节任意一个环节出问题将导致hbase集群不可用。
hbase是列式存储数据库,即nosql数据库,列数据可以十多版本的。
Cell:{rowkey + cf + column + version(timestamp)} : value
table,分为多个region,region按照rowKey进行管理,[startRowKey,stopRowKey),四个key分为5个Region。不在范围内的便于插入region头和尾部。
在默认的情况下,创建一个表时,会给一个表创建一个Region
hbase客户端存储和检索数据直接是通过zookeeper来实现的。依据rowkey在zookeeper上找到对应的region,然后根据region通过zookeeper找到对应的regionserver,这个过程并没有经过Hmaster。最后找到regionserver去检索数据。
client 会先写入 hlog 文件(WAL:Write Head Log) 再写入memStore(Memroy)达到一定阈值的时候flush到StoreFile。这样可以保证数据完整性。
hbase中所有数据文件都存储在hadoopHDFS文件系统上,主要包括两种文件类型:
1,HFile:hbase中kevalue数据的存储格式,HFile是hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级的包装,进行数据存储。
2,HLogFile:hbase中WAL(Write Ahead Log)的存储格式,物理上是Hadoop的SequenceFile。
1,HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了tables中的一个region,HRegion中由多个HStroe组成。每个HStore对应于Table中的一个column family的存储,可以看出每个columnfamily其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个columnfamily中,这样最高效。
2,HStore存储是Hbase存储的核心,由两部分组成,一部分是MemStore,一部分是StoreFile。MemStore是Sorted Memory Buffer,用户写入的数据首先会放入Memstore,当MemsTore满了以后会flush成一个storeFile(底层是现实HFile)。
1,用户写入数据流程
2,Client写入数据首先写入到MemStore,一直到MemStore
满,就会flush成一个StoreFile,直至增长到一定阈值,就会触发Compact合并操作,多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除,当StoreFilesCompact后,逐步形成越来越大的StoreFile,单个StoreFile大小超过一定阈值之后,触发Split操作,把当前Regio Split成两个Region,父region会下线,新的split出的两个孩子region会被HMaste分配到相应的HRegionServer上,使得原先1个region的压力得以分流到2个region上。
3,HBase只是增加数据,有所得更新和删除操作,都是在Compact阶段做的,所以用户写操作只需要进入到内存即可立即返回,从而保证IO高性能。
1,WAL意为Write ahead log,类似Mysql中的binlog,用来做灾难恢复。Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行数据恢复。每个HRegionServer维护一个HLog而不是一个Hregion一个。这样不同region(来自不同tables)的日志就会混在一起,这样做的目的是不断追加单个文件相对于同时写多个文件而言,可以减少按值寻址次数,因此可以提高对table的写性能,带来的麻烦事,如果一台HRegionServer下线,为恢复其上的region,需要将HRegionServer上的log进行拆分,然后分发到其他hregionServer上进行数据恢复。
1,整个hbase集群访问入口;
2,使用HBase RPC机制与HMaster和HRegionServer进行通信;
3,与HMaster进行通信管理类操作
4,与HRegionServer进行数据读写操作
5,包含访问HBase的接口,并维护cache来加快对HRegion的访问。
1,保证任何时候,集群中只有一个HMaster
2,存储所有HRegion的寻址入口;
3,实时监控HRegionServer的上线和下线信息,并实时通知给HMaster;
4,存储HBase的schema和table元数据;
5,zookeeper Quorum存储-ROOT-表地址、HMaster地址
1,HMaster没有单点问题,HBase中可以启动多个HMaster,通过zookeeper的Master Election机制保证总有一个Master在运行主要负责Table和Region的管理工作。
2,管理用户对table的增删改查操作;
3,管理HRegionServer的负载均衡,调整Region分布;
4,RegionSplit后,负责新Region的分布;
5,在HRegionServer停机后,负责将失效HRegionServer上Region迁移工作。
1,维护HRegion,处理对这些HRegion的IO请求,向HDFS文件系统中读写数据;
2,负责切分在运行过程中变得过大的HRegion
3,Client访问HBase上数据的过程并不需要master参与(寻址访问zookeeper和HRegionServer,数据读写访问HRegionServer),HMaster仅仅维护者table和Region的元数据信息,负载很低。
1,HBase依赖zookeeper;
2,默认情况下,HBase管理zookeeper实例,比如,启动或者停止zookeeper;
3,HMaster与RegionServers启动时会向zookeeper注册;
4,zookeeper的引入使得HMaster不再是单点故障。
首先在pom.xml文件中添加如下内容
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.98.6-hadoop2</hbase.version>
</properties>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
然后再resource目录下添加
hbase-site.xml文件
package hbase.learn.com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
public class hbaseJAVAAPI {
public static HTable getHTableByTableName(String tableName) //
throws Exception {
// Get instance of Configuration
Configuration configuration = HBaseConfiguration.create();
// Get table instance
HTable table = new HTable(configuration, tableName);
System.out.println(table);
return table;
}
/*
* Get Data From Table By Rowkey
*/
public void getData()throws Exception {
String tableName = "user";
// Get table instance
HTable table = getHTableByTableName(tableName);
// Create Get with rowkey
Get get = new Get(Bytes.toBytes("1001")) ;
//=======================================
get.addColumn(//
Bytes.toBytes("info"),//
Bytes.toBytes("age")//
);
//=======================================
// Get Data
Result result = table.get(get) ;
// System.out.println(result);
/**
* Key:
* rowkey + cf + c + version
*
* Value :
* value
*/
for(Cell cell : result.rawCells()){
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell))
+ ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) //
+ " -> " //
+ Bytes.toString(CellUtil.cloneValue(cell)) //
+ " " //
+ cell.getTimestamp()
);
System.out.println("--------------------------------");
}
// close
table.close();
}
/**
* Put Data into Table
* @throws Exception
*
* Map<String,Object>
*
*/
public void putData() throws Exception {
String tableName = "user";
// Get table instance
HTable table = getHTableByTableName(tableName);
// create put instance
Put put = new Put(Bytes.toBytes("1001")) ;
// Add a column with value
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("sex"), //
Bytes.toBytes("male") //
) ;
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("tel"), //
Bytes.toBytes("010-876523423") //
) ;
put.add(//
Bytes.toBytes("info"), //
Bytes.toBytes("address"), //
Bytes.toBytes("beijing") //
) ;
/*
Map<String,Object> kvs = new HashMap<String,Object>() ;
for(String key : kvs.keySet()){
put.add(//
HBaseTableConstant.HBASE_TABLE_USER_CF, //
Bytes.toBytes(key), //
Bytes.toBytes(kvs.get(key)) //
) ;
}
*/
// put data into table
table.put(put);
// close
table.close();
}
public void deleteData() throws Exception {
String tableName = "user";
// Get table instance
HTable table = getHTableByTableName(tableName);
Delete delete = new Delete (Bytes.toBytes("1001")) ;
delete.deleteFamily(Bytes.toBytes("info")) ;
// delete.deleteColumn(//
// Bytes.toBytes("info"), //
// Bytes.toBytes("sex") //
// ) ;
// delete.deleteColumns(family, qualifier) ;
// delete data
table.delete(delete);
// close
table.close();
}
public void scanData()throws Exception {
String tableName = "user";
// Get table instance
HTable table = null;
ResultScanner resultScanner = null ;
try{
table = getHTableByTableName(tableName);
Scan scan = new Scan() ;
//===========================================================
//Range
// scan.setStartRow(startRow);
// scan.setStopRow(stopRow) ;
//Scan
// Scan scan2 = new Scan(startRow,stopRow) ;
//add Column
// scan.addColumn(family, qualifier) ;
// scan.addFamily(family) ;
//Filter
// Filter filter = new PrefixFilter(prefix) ;
// scan.setFilter(filter) ;
// page
// PageFilter
//Cache
// scan.setCacheBlocks(cacheBlocks);
// scan.setCaching(caching);
//===========================================================
resultScanner = table.getScanner(scan) ;
// iterator
for(Result result : resultScanner){
for(Cell cell : result.rawCells()){
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println(//
Bytes.toString(CellUtil.cloneFamily(cell))
+ ":" //
+ Bytes.toString(CellUtil.cloneQualifier(cell)) //
+ " -> " //
+ Bytes.toString(CellUtil.cloneValue(cell)) //
+ " " //
+ cell.getTimestamp()
);
}
System.out.println("##################################");
}
}catch(Exception e ){
e.printStackTrace();
}finally {
IOUtils.closeStream(resultScanner);
IOUtils.closeStream(table);
}
}
public static void main(String[] args) throws Exception {
hbaseJAVAAPI hbase=new hbaseJAVAAPI();
System.out.println("==================get table name===================");
hbase.getHTableByTableName("user");
System.out.println("==================put data===================");
hbase.putData();
System.out.println("==================get data===================");
hbase.getData();
System.out.println("==================scan data===================");
hbase.scanData();
System.out.println("==================delete data===================");
hbase.deleteData();
System.out.println("==================scan data===================");
hbase.scanData();
}
}
==================get table name===================
==================put data===================
user;hconnection-0x3be33bad
==================get data===================
user;hconnection-0x3be33bad
1001
info:age -> 18 1451913054898
--------------------------------
==================scan data===================
user;hconnection-0x3be33bad
10001
info:address -> beijing 1451914706880
10001
info:sex -> male 1451914706880
10001
info:tel -> 010-876523423 1451914706880
##################################
1001
info:address -> beijing 1451915182523
1001
info:age -> 18 1451913054898
1001
info:name -> lisi 1451913049037
1001
info:sex -> male 1451915182523
1001
info:tel -> 010-876523423 1451915182523
##################################
==================delete data===================
user;hconnection-0x3be33bad
==================scan data===================
user;hconnection-0x3be33bad
10001
info:address -> beijing 1451914706880
10001
info:sex -> male 1451914706880
10001
info:tel -> 010-876523423 1451914706880
##################################
package hbase.learn.com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class User2BasicMapReduce extends Configured implements Tool {
// step 1 : Mapper
/**
* Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
*/
public static class ReadUserMapper extends //
TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
// get rowkey
String rowkey = Bytes.toString(key.get());
// set map output key
mapOutputKey.set(rowkey);
// ==============================================================
// create Put
Put put = new Put(key.get());
// iterator
for (Cell cell : value.rawCells()) {
// add family : info
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column : name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
// add column : age
else if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
// context write
context.write(mapOutputKey, put);
}
}
// step 2: Reducer
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
*/
public static class WriteBasicReducer extends //
TableReducer<Text, Put, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
for(Put put : values){
context.write(null, put);
}
}
}
// step 3: Driver
public int run(String[] args) throws Exception {
// 1) configuration
Configuration conf = this.getConf();
// 2) create job
Job job = Job.getInstance(conf, //
this.getClass().getSimpleName());
job.setJarByClass(User2BasicMapReduce.class); // class that contains
// mapper and reducer
// 3) set job
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(//
"user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job //
);
TableMapReduceUtil.initTableReducerJob(//
"basic", // output table
WriteBasicReducer.class, // reducer class
job //
);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// get configuration
Configuration configuration = HBaseConfiguration.create();
// run job
int status = ToolRunner.run(//
configuration, //
new User2BasicMapReduce(), //
args //
);
// exit program
System.exit(status);
}
}
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` $HADOOP_HOME/bin/yarn jar /opt/tools/User2BasicMapReduce.jar
执行成功后
经追查源码查到在Put时,虽然实例化了正确的Put,但是Put中没有KeyValue值,表现出来的状况就是在Put的familyMap中没有任何的值,根本原因还是数据有问题,但从此问题中可以看到,如果一个Put中没有任何的value那么是不会被放入HBase中的。当然这可能也和HBase存储数据的方式有关,在RDBMS中我们可以存储类似“id01,null,null”的数据,但在HBase中,值为空的数据是不会被存储的。put使用的时候,假如为空就会报如下错误。
因为数据中不存在age列。
^C16/01/04 23:38:19 INFO mapreduce.Job: Task Id : attempt_1451912421695_0002_r_000000_2, Status : FAILED
Error: java.lang.IllegalArgumentException: No columns to insert
at org.apache.hadoop.hbase.client.HTable.validatePut(HTable.java:1304)
at org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:941)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:908)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:126)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:87)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at hbase.learn.com.User2BasicMapReduce$WriteBasicReducer.reduce(User2BasicMapReduce.java:77)
at hbase.learn.com.User2BasicMapReduce$WriteBasicReducer.reduce(User2BasicMapReduce.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
create 'student','info'
/opt/modules/hadoop-2.5.0/bin/hdfs dfs -put /opt/datas/importtsv.tsv /user/hadoop/hbase/importtsv/input
#!/bin/sh
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
student \
hdfs://miaodonghua1.host:8020/user/hadoop/hbase/importtsv/input
./importtsv.sh
执行成功后
输入如下内容
#/bin/sh
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://miaodonghua1.host:8020/user/hadoop/hbase/importtsv/hfileoutput \
student \
hdfs://miaodonghua1.host:8020/user/hadoop/hbase/importtsv/input
./hfile-importtsv.sh
执行成功后
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` $HADOOP_HOME/bin/yarn jar \ $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \ completebulkload \ /user/hadoop/hbase/importtsv/hfileoutput/ student
#/bin/sh
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://miaodonghua1.host:8020/user/hadoop/hbase/importtsv/csvhfileoutput \
-Dimporttsv.separator=, \
student \
hdfs://miaodonghua1.host:8020/user/hadoop/hbase/importtsv/input
./csv-hfile-importtsv.sh
hfile数据生成成功
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` $HADOOP_HOME/bin/yarn jar \ $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
completebulkload \
/user/hadoop/hbase/importtsv/csvhfileoutput/ student
数据导入成功