[关闭]
@Arslan6and6 2016-10-10T07:02:18.000000Z 字数 7946 阅读 1230

spark集成Hbase

spark


Spark如何与HBase进行集成?

在pom.xml添加 Hbase 依赖

  1. <properties>
  2. <encoding>UTF-8</encoding>
  3. <hadoop.version>2.5.0</hadoop.version>
  4. <spark.version>1.6.1</spark.version>
  5. <hbase.version>0.98.6-hadoop2</hbase.version>
  6. </properties>
  7. <dependency>
  8. <groupId>org.apache.hbase</groupId>
  9. <artifactId>hbase-server</artifactId>
  10. <version>${hbase.version}</version>
  11. <scope>compile</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.hbase</groupId>
  15. <artifactId>hbase-client</artifactId>
  16. <version>${hbase.version}</version>
  17. <scope>compile</scope>
  18. </dependency>
  1. //启动zookeeper
  2. [beifeng@hadoop-senior zookeeper-3.4.5-cdh5.3.6]$ bin/zkServer.sh start
  3. //启动 HBase
  4. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase-daemon.sh start master
  5. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase-daemon.sh start regionserver
  6. [beifeng@hadoop-senior hbase-0.98.6-hadoop2]$ bin/hbase shell
  7. hbase(main):001:0> list
  8. TABLE
  9. 1 row(s) in 1.8460 seconds
  10. => ["user"]
  11. hbase(main):002:0> scan 'user'
  12. ROW COLUMN+CELL
  13. 10001 column=info:name, timestamp=1472717309575, value=king
  14. 10003 column=info:size, timestamp=1472717622108, value=85
  15. 2 row(s) in 0.1950 seconds
  16. //复制 hbase-site.xml 到项目 resources 文件夹下
  17. cp /opt/modules/hbase-0.98.6-hadoop2/conf/hbase-site.xml /home/beifeng/IdeaProjects/log-analyzer/src/main/resources

参照HBase集成mapreduce案例

  1. package com.beifeng.senior.hadoop.beifenghbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.hbase.Cell;
  6. import org.apache.hadoop.hbase.CellUtil;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Result;
  10. import org.apache.hadoop.hbase.client.Scan;
  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.NullWritable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. public class User2StudentMapReduce extends Configured implements Tool{
  21. // step 1: Mapper
  22. public static class ReadUserMapper //
  23. extends TableMapper<ImmutableBytesWritable, Put>{
  24. @Override
  25. protected void map(ImmutableBytesWritable key, Result value,
  26. Context context)
  27. throws IOException, InterruptedException {
  28. // user: name & age -> student: name & age : put
  29. // create Put
  30. Put put = new Put(key.get()) ;
  31. // add column
  32. for(Cell cell: value.rawCells()){
  33. // add family: info
  34. if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
  35. // add column: name
  36. if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  37. put.add(cell) ;
  38. // CellUtil.cloneValue(cell)
  39. // put.add(family, qualifier, value) ;
  40. }
  41. // add column: age
  42. else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  43. put.add(cell) ;
  44. }
  45. }
  46. }
  47. // context output
  48. context.write(key, put);
  49. }
  50. }
  51. // step 2: Reducer
  52. public static class WriteStudentReducer //
  53. extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
  54. @Override
  55. protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
  56. Context context) throws IOException, InterruptedException {
  57. for(Put put : values){
  58. context.write(NullWritable.get(), put);
  59. }
  60. }
  61. }
  62. // step 3: Driver
  63. public int run(String[] args) throws Exception {
  64. // 1) Configuration
  65. Configuration conf = this.getConf();
  66. // 2) create job
  67. Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
  68. job.setJarByClass(User2StudentMapReduce.class);
  69. // 3) set job
  70. // input -> mapper -> shuffle -> reducer -> output
  71. Scan scan = new Scan() ;
  72. scan.setCacheBlocks(false);
  73. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  74. TableMapReduceUtil.initTableMapperJob(
  75. "user", // input table
  76. scan, // Scan instance to control CF and attribute selection
  77. ReadUserMapper.class, // mapper class
  78. ImmutableBytesWritable.class, // mapper output key
  79. Put.class, // mapper output value
  80. job //
  81. );
  82. TableMapReduceUtil.initTableReducerJob(
  83. "student", // output table
  84. WriteStudentReducer.class, // reducer class
  85. job //
  86. );
  87. job.setNumReduceTasks(1); // at least one, adjust as required
  88. boolean isSuccess = job.waitForCompletion(true);
  89. if (!isSuccess) {
  90. throw new IOException("error with job!");
  91. }
  92. return isSuccess ? 0 : 1;
  93. }
  94. public static void main(String[] args) throws Exception {
  95. Configuration conf = HBaseConfiguration.create();
  96. int status = ToolRunner.run(//
  97. conf, //
  98. new User2StudentMapReduce(), //
  99. args //
  100. );
  101. System.exit(status);
  102. }
  103. }

创建 HBaseConfiguration 就可以写为
1.依照HBase集成mapreduce案例:
val conf = HBaseConfiguration.create()
2.依照上述推理 HBase配置为:

  1. val conf = HBaseConfiguration.create()
  2. conf.set(TableInputFormat.INPUT_TABLE, "user")

创建读取HBase数据RDD思路

依照 newAPIHadoopRDD 方法,

  1. /* def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
  2. conf: Configuration = hadoopConfiguration,
  3. fClass: Class[F],
  4. kClass: Class[K],
  5. vClass: Class[V]): RDD[(K, V)] = withScope {...}
  6. */
  7. val hbaseRdd = sc.newAPIHadoopRDD(
  8. conf,
  9. classOf[TableInputFormat],
  10. classOf[ImmutableBytesWritable],
  11. classOf[Result]
  12. )

读取HBase数据案例具体代码如下:

  1. package com.ibeifeng.bigdata.spark.app.core
  2. import org.apache.hadoop.hbase.HBaseConfiguration
  3. import org.apache.hadoop.hbase.client.Result
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  6. import org.apache.spark.{SparkConf, SparkContext}
  7. /**
  8. * Created by beifeng on 9/4/16.
  9. */
  10. object SparkReadHBase {
  11. def main(args: Array[String]) {
  12. //step 0 create SparkContext
  13. val sparkConf = new SparkConf()
  14. .setAppName("SparkReadHBase Applicaiton")
  15. .setMaster("local[2]")
  16. val sc = new SparkContext(sparkConf)
  17. //create HBaseConfiguration
  18. /**
  19. * conf: Configuration = hadoopConfiguration,
  20. fClass: Class[F],
  21. kClass: Class[K],
  22. vClass: Class[V]
  23. RDD[(K, V)]
  24. */
  25. val conf = HBaseConfiguration.create()
  26. // INPUT_TABLE = "hbase.mapreduce.inputtable"
  27. conf.set(TableInputFormat.INPUT_TABLE, "user")
  28. val hbaseRdd = sc.newAPIHadoopRDD(
  29. conf,
  30. classOf[TableInputFormat],
  31. classOf[ImmutableBytesWritable],
  32. classOf[Result]
  33. )
  34. println(hbaseRdd.count()+"===============================")
  35. sc.stop()
  36. }
  37. }

执行结果如下,与Hbase中user表2条数据相符

  1. 2===============================

在HBase shell命令行中向user表添加一条数据

  1. hbase(main):003:0> put 'user','10002','info:name','mini'

再次执行程序,结果仍然相符

  1. 3===============================

读取HBase表中各cell案例:
CellUtil类:用于操作单元格的工具类。常用静态方法如下:

① CellUtil.cloneFamily(某列对象): 获取列族信息

② CellUtil.cloneQualifier(某列对象): 获取列信息

③ CellUtil.cloneValue(某列对象):获取值

对照HBase集成mapreduce
image_1arqao6il1o6f1bpd5stgt825f9.png-530.1kB

  1. package com.ibeifeng.bigdata.spark.app.core
  2. import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
  3. import org.apache.hadoop.hbase.client.Result
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.spark.{SparkConf, SparkContext}
  8. /**
  9. * Created by beifeng on 9/4/16.
  10. */
  11. object SparkReadHBase {
  12. def main(args: Array[String]) {
  13. //step 0 create SparkContext
  14. val sparkConf = new SparkConf()
  15. .setAppName("SparkReadHBase Applicaiton")
  16. .setMaster("local[2]")
  17. val sc = new SparkContext(sparkConf)
  18. //create HBaseConfiguration
  19. /**
  20. * conf: Configuration = hadoopConfiguration,
  21. fClass: Class[F],
  22. kClass: Class[K],
  23. vClass: Class[V]
  24. RDD[(K, V)]
  25. */
  26. val conf = HBaseConfiguration.create()
  27. // INPUT_TABLE = "hbase.mapreduce.inputtable"
  28. conf.set(TableInputFormat.INPUT_TABLE, "user")
  29. val hbaseRdd = sc.newAPIHadoopRDD(
  30. conf,
  31. classOf[TableInputFormat],
  32. classOf[ImmutableBytesWritable],
  33. classOf[Result]
  34. )
  35. println(hbaseRdd.count()+"===============================")
  36. hbaseRdd.map(tuple => {
  37. val rowkey = Bytes.toString(tuple._1.get())
  38. val result = tuple._2
  39. var rowStr = rowkey + ","
  40. for (cell <- result.rawCells()){
  41. rowStr += Bytes.toString( CellUtil.cloneFamily(cell)) + ":" +
  42. Bytes.toString(CellUtil.cloneQualifier(cell)) +"->" +
  43. Bytes.toString(CellUtil.cloneValue(cell)) + "---------"
  44. }
  45. //return
  46. rowStr
  47. }).foreach(println)
  48. sc.stop()
  49. }
  50. }

执行结果如下

  1. 10001,info:name->king---------
  2. 10002,info:name->mini---------
  3. 10003,info:size->85---------
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注