@Arslan6and6
2016-10-10T07:02:18.000000Z
字数 7946
阅读 1230
spark
Spark如何与HBase进行集成?
在pom.xml添加 Hbase 依赖
<properties>
<encoding>UTF-8</encoding>
<hadoop.version>2.5.0</hadoop.version>
<spark.version>1.6.1</spark.version>
<hbase.version>0.98.6-hadoop2</hbase.version>
</properties>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>compile</scope>
</dependency>
//启动zookeeper
[beifeng@hadoop-senior zookeeper-3.4.5-cdh5.3.6]$ bin/zkServer.sh start
//启动 HBase
[beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase-daemon.sh start master
[beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase-daemon.sh start regionserver
[beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase shell
hbase(main):001:0> list
TABLE
1 row(s) in 1.8460 seconds
=> ["user"]
hbase(main):002:0> scan 'user'
ROW COLUMN+CELL
10001 column=info:name, timestamp=1472717309575, value=king
10003 column=info:size, timestamp=1472717622108, value=85
2 row(s) in 0.1950 seconds
//复制 hbase-site.xml 到项目 resources 文件夹下
cp /opt/modules/hbase-0.98.6-hadoop2/conf/hbase-site.xml /home/beifeng/IdeaProjects/log-analyzer/src/main/resources
参照HBase集成mapreduce案例
package com.beifeng.senior.hadoop.beifenghbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class User2StudentMapReduce extends Configured implements Tool{
// step 1: Mapper
public static class ReadUserMapper //
extends TableMapper<ImmutableBytesWritable, Put>{
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
// user: name & age -> student: name & age : put
// create Put
Put put = new Put(key.get()) ;
// add column
for(Cell cell: value.rawCells()){
// add family: info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
// add column: name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell) ;
// CellUtil.cloneValue(cell)
// put.add(family, qualifier, value) ;
}
// add column: age
else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell) ;
}
}
}
// context output
context.write(key, put);
}
}
// step 2: Reducer
public static class WriteStudentReducer //
extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
Context context) throws IOException, InterruptedException {
for(Put put : values){
context.write(NullWritable.get(), put);
}
}
}
// step 3: Driver
public int run(String[] args) throws Exception {
// 1) Configuration
Configuration conf = this.getConf();
// 2) create job
Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
job.setJarByClass(User2StudentMapReduce.class);
// 3) set job
// input -> mapper -> shuffle -> reducer -> output
Scan scan = new Scan() ;
scan.setCacheBlocks(false);
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
TableMapReduceUtil.initTableMapperJob(
"user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
ImmutableBytesWritable.class, // mapper output key
Put.class, // mapper output value
job //
);
TableMapReduceUtil.initTableReducerJob(
"student", // output table
WriteStudentReducer.class, // reducer class
job //
);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("error with job!");
}
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(//
conf, //
new User2StudentMapReduce(), //
args //
);
System.exit(status);
}
}
创建 HBaseConfiguration 就可以写为
1.依照HBase集成mapreduce案例:
val conf = HBaseConfiguration.create()
2.依照上述推理 HBase配置为:
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "user")
创建读取HBase数据RDD思路
依照 newAPIHadoopRDD 方法,
/* def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = withScope {...}
*/
val hbaseRdd = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
package com.ibeifeng.bigdata.spark.app.core
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by beifeng on 9/4/16.
*/
object SparkReadHBase {
def main(args: Array[String]) {
//step 0 create SparkContext
val sparkConf = new SparkConf()
.setAppName("SparkReadHBase Applicaiton")
.setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//create HBaseConfiguration
/**
* conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
RDD[(K, V)]
*/
val conf = HBaseConfiguration.create()
// INPUT_TABLE = "hbase.mapreduce.inputtable"
conf.set(TableInputFormat.INPUT_TABLE, "user")
val hbaseRdd = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println(hbaseRdd.count()+"===============================")
sc.stop()
}
}
执行结果如下,与Hbase中user表2条数据相符
2===============================
在HBase shell命令行中向user表添加一条数据
hbase(main):003:0> put 'user','10002','info:name','mini'
再次执行程序,结果仍然相符
3===============================
读取HBase表中各cell案例:
CellUtil类:用于操作单元格的工具类。常用静态方法如下:
① CellUtil.cloneFamily(某列对象): 获取列族信息
② CellUtil.cloneQualifier(某列对象): 获取列信息
③ CellUtil.cloneValue(某列对象):获取值
对照HBase集成mapreduce
package com.ibeifeng.bigdata.spark.app.core
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by beifeng on 9/4/16.
*/
object SparkReadHBase {
def main(args: Array[String]) {
//step 0 create SparkContext
val sparkConf = new SparkConf()
.setAppName("SparkReadHBase Applicaiton")
.setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//create HBaseConfiguration
/**
* conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
RDD[(K, V)]
*/
val conf = HBaseConfiguration.create()
// INPUT_TABLE = "hbase.mapreduce.inputtable"
conf.set(TableInputFormat.INPUT_TABLE, "user")
val hbaseRdd = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println(hbaseRdd.count()+"===============================")
hbaseRdd.map(tuple => {
val rowkey = Bytes.toString(tuple._1.get())
val result = tuple._2
var rowStr = rowkey + ","
for (cell <- result.rawCells()){
rowStr += Bytes.toString( CellUtil.cloneFamily(cell)) + ":" +
Bytes.toString(CellUtil.cloneQualifier(cell)) +"->" +
Bytes.toString(CellUtil.cloneValue(cell)) + "---------"
}
//return
rowStr
}).foreach(println)
sc.stop()
}
}
执行结果如下
10001,info:name->king---------
10002,info:name->mini---------
10003,info:size->85---------