[关闭]
@Arslan6and6 2016-10-10T07:00:40.000000Z 字数 11019 阅读 1251

sparkSQL hive on spark

spark


回顾
-1,对于Spark Core来说,应用程序的入口
SparkContext
-2,对于Spark SQL来说,应用程序的入口
SQLContext 子类-> HiveContext
底层是 SparkContext

  1. 新建文件夹存放spark所需 jar
  2. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ mkdir inputJars
  3. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ cp /opt/modules/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/

SparkCore:RDD 形式
上传示例文件夹中 people.txt 文本文件到HDFS默认用户目录

  1. [beifeng@hadoop-senior resources]$ more people.txt
  2. Michael, 29
  3. Andy, 30
  4. Justin, 19
  5. bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.txt
  6. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
  7. 制作一个匹配模式
  8. scala> case class People (name:String, age:Int)
  9. defined class People
  10. People.txt 加载到RDD并分割
  11. scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))
  12. ...
  13. rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at <console>:29
  14. scala> rdd.collect
  15. res2: Array[People] = Array(People(Michael,29), People(Andy,30), People(Justin,19))

SparkSQL:DataFrame 形式

  1. 上传 people.json
  2. [beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.json
  3. ...
  4. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  5. scala> val df = sqlContext.read.json("/user/beifeng/people.json")
  6. scala> df.show
  7. +----+-------+
  8. | age| name|
  9. +----+-------+
  10. |null|Michael|
  11. | 30| Andy|
  12. | 19| Justin|
  13. +----+-------+

rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at :29
对比
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
DF可以在加载文件时就看出字段名称及类型

读取 hive 表

  1. scala> val hive_table_df = sqlContext.read.table("test.my_usermysql2hive")
  2. hive_table_df: org.apache.spark.sql.DataFrame = [id: tinyint, account: string, passwd: string]

如何创建 DataFrame

1.外部数据源-内置

--1.json
--2.hive
--3.jdbc
--4.parquet/orc 默认
--5.text 没有用

2.从RDD转换

--1.方式一
RDD[CASECALSS]
--2.方式二
自定义schema(模式)

3.外部数据源 - 需要自己已经接口开发

--1.ES 检索
--2.HBase 、华为
--3.solr
...

image_1aqeao1apa0114bq1qbcgmr1n2h9.png-53kB
读取 parquet 文件

  1. [beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/users.parquet
  2. scala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")
  3. user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]
  4. scala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")
  5. scala> user_df.show
  6. +------+--------------+----------------+
  7. | name|favorite_color|favorite_numbers|
  8. +------+--------------+----------------+
  9. |Alyssa| null| [3, 9, 15, 20]|
  10. | Ben| red| []|
  11. +------+--------------+----------------+
  12. 默认读取文件格式就是 parquet
  13. scala> val user_df = sqlContext.read.load("/user/beifeng/users.parquet")
  14. user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]

把文件写入MySQL

  1. 第一步,将文件加载到 DataFrame
  2. scala> val w2mysq_df = sqlContext.read.table("world.city")
  3. w2mysq_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]
  4. 第二步,建立连接及声明目标 MySQL 数据库名,后缀接用户名、密码
  5. scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"
  6. url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?username=root&password=123
  7. 第三步,导入 Properties 。因为jdbc方法参数列表为 url, 目标表名, properties实例)
  8. def jdbc(url: String, table: String, properties: Properties): DataFrame = {
  9. jdbc(url, table, JDBCRelation.columnPartition(null), properties)
  10. }
  11. scala> import java.util.Properties
  12. import java.util.Properties
  13. 第四步,实例化Properties
  14. scala> val prop = new Properties()
  15. prop: java.util.Properties = {}
  16. 第五步,将 DateFrame 写入MySQL
  17. scala> w2mysq_df.write.jdbc(url, "fromSparkSQL", prop)
  18. java.sql.SQLException: No suitable driver
  19. 报错:找不到适合的驱动。说明 --jars 方法加载驱动不适合
  20. 解决办法:
  21. 关闭spark-shell
  22. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
  23. 在同一会话窗口重新开启spark-shell
  24. bin/spark-shell --master local[2]
  25. 然后重新执行上述第一步到第五步后,可以查看到从Hive导入MySQL的表" fromSparkSQL"
  26. mysql> show tables;
  27. +-----------------+
  28. | Tables_in_world |
  29. +-----------------+
  30. | City |
  31. | Country |
  32. | CountryLanguage |
  33. | fromSparkSQL |
  34. +-----------------+
  35. 4 rows in set (0.00 sec)
  36. mysql> select * from fromSparkSQL limit 5;
  37. +------+----------------+-------------+-----------+------------+
  38. | id | name | countrycode | district | population |
  39. +------+----------------+-------------+-----------+------------+
  40. | 1021 | Sawangan | IDN | West Java | 91100 |
  41. | 1 | Kabul | AFG | Kabol | 1780000 |
  42. | 1022 | Banyuwangi | IDN | East Java | 89900 |
  43. | 2 | Qandahar | AFG | Qandahar | 237500 |
  44. | 1023 | Tanjung Pinang | IDN | Riau | 89900 |
  45. +------+----------------+-------------+-----------+------------+
  46. 5 rows in set (0.00 sec)

读取 MySQL 数据时, Properties 对象可以插入 用户名、密码参数

  1. scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world"
  2. url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world
  3. scala> val props = new Properties()
  4. props: java.util.Properties = {}
  5. scala> props.put("user","root")
  6. res7: Object = null
  7. scala> props.put("password","123")
  8. res8: Object = null
  9. scala> val mysql_city_df = sqlContext.read.jdbc(url, "City", props)
  10. mysql_city_df: org.apache.spark.sql.DataFrame = [ID: int, Name: string, CountryCode: string, District: string, Population: int]

案例:JOIN MySQL中的 world.Country 和 hive 中的 world.city , 以 countrycode 连接

  1. 连接MySQL world 数据库,并附加用户名及密码
  2. scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"
  3. url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123
  4. 实例化化 Properites
  5. scala> val prop = new Properties()
  6. prop: java.util.Properties = {}
  7. 读取 MySQL中的 world.Country DataFrame
  8. scala> val mysql_Country_df = sqlContext.read.jdbc(url, "Country", prop)
  9. mysql_Country_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string]
  10. 读取 hive 中的 world.city DataFrame
  11. scala> val hive_city_df = sqlContext.read.table("world.city")
  12. hive_city_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]
  13. mysql_country_df("Code")===hive_city_df("countrycode") 字段连接两张表
  14. 字段名不同 join() 语法 DataFrame.join (右DataFrame DataFrame("join字段名")===右DataFrame("join字段"))
  15. scala> val join_df = mysql_country_df.join(hive_city_df, mysql_country_df("Code")===hive_city_df("countrycode"))
  16. join_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string, id: int, name: string, countrycode: string, district: string, population: int]
  17. 如果 join 字段名相同
  18. DataFrame.join(右DataFrame, "jion字段名")
  19. 使用 DataFrame.registerTempTable ("临时表表名") 方法 join DataFrame 放进临时表
  20. scala> join_df.registerTempTable("join_temp_countrycity")
  21. 然后就可以便捷查询
  22. scala> sqlContext.sql("select Code, id from join_temp_countrycity limit 5").show

image_1aqf1vgfi1avmf6vk911snchh79.png-164.9kB

使用反射机制推断一个 Schema 模式

  1. scala> case class People(name:String, age:Int)
  2. scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))
  3. scala> val people_df = rdd.toDF()
  4. people_df.show
  5. +-------+---+
  6. | name|age|
  7. +-------+---+
  8. |Michael| 29|
  9. | Andy| 30|
  10. | Justin| 19|
  11. +-------+---+

构建一个模式,把该模式给DataFrame

  1. --1,RDD -> RDD[Row]
  2. scala> import org.apache.spark.sql._
  3. scala> val rdd = sc.textFile("/user/beifeng/people.txt")
  4. //scala> rdd.collect
  5. //res7: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
  6. //rdd.map(line=>line.split(", ")).collect
  7. //res6: Array[Array[String]] = Array(Array(Michael, 29), Array(Andy, 30), Array(Justin, 19))
  8. scala> val rowRdd = rdd.map(line=>line.split(", ")).map(x=>Row(x(0), x(1).toInt))
  9. //scala> rowRdd.collect
  10. //res8: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
  11. --2,Create the schema
  12. scala> import org.apache.spark.sql.types._
  13. import org.apache.spark.sql.types._
  14. scala> val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)
  15. schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
  16. --3,Apply the schema to the RDD of Rows
  17. scala> val people_df = sqlContext.createDataFrame(rowRdd, schema)
  18. people_df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  19. scala> people_df.show
  20. +-------+---+
  21. | name|age|
  22. +-------+---+
  23. |Michael| 29|
  24. | Andy| 30|
  25. | Justin| 19|
  26. +-------+---+
  27. Spark1.6.1官方演示代码:
  28. // sc is an existing SparkContext.
  29. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  30. // Create an RDD
  31. val people = sc.textFile("examples/src/main/resources/people.txt")
  32. // The schema is encoded in a string
  33. val schemaString = "name age"
  34. // Import Row.
  35. import org.apache.spark.sql.Row;
  36. // Import Spark SQL data types
  37. import org.apache.spark.sql.types.{StructType,StructField,StringType};
  38. // Generate the schema based on the string of schema
  39. val schema =
  40. StructType(
  41. schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  42. // Convert records of the RDD (people) to Rows.
  43. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
  44. // Apply the schema to the RDD.
  45. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
  46. // Register the DataFrames as a table.
  47. peopleDataFrame.registerTempTable("people")
  48. // SQL statements can be run by using the sql methods provided by sqlContext.
  49. val results = sqlContext.sql("SELECT name FROM people")
  50. // The results of SQL queries are DataFrames and support all the normal RDD operations.
  51. // The columns of a row in the result can be accessed by field index or by field name.
  52. results.map(t => "Name: " + t(0)).collect().foreach(println)

DataFrame 在需要时其他

  1. 按需要时 DF 可以转化成RDDRDD的类型变为 [Row]
  2. scala> people_df.rdd
  3. res5: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[8] at rdd at <console>:42
  4. 前端异步传输时需要转化为 JSON
  5. scala> people_df.toJSON.collect
  6. res8: Array[String] = Array({"name":"Michael","age":29}, {"name":"Andy","age":30}, {"name":"Justin","age":19})

SaveMode

spark读取外部文件框架 如何读其他格式的文件

  1. https://github.com/databricks
  2. spark-csv csv 格式文本文件 比如把 csv 文件加载到 hive 表中,再从 hive 读取 。就该下载该工程并编译成 jar
  3. spark-avro Hadoop生态系统中一个序列化框架,属于 apache 顶级项目
  4. http://avro.apache.org/
  5. spark-xml 用于读取 xml 文件
  6. learning-spark 源码书
  7. reference-apps 课程参考案例来源于此
  1. https://spark-packages.org/
  2. spork
  3. spark-jobserver 有时间一定要细看
  4. zeppelin
  5. spark-hbase
  6. spark-mongodb
  7. spark-sorted
  8. spark-kafka
  9. spark-notebook

image_1aqhfrblhu0d1bv110131kbc1rglm.png-76.7kB

image_1aqhfu7hk6j6rp14uv18j4qmt13.png-264.7kB


DataSet

使用Encode方式进行序列化对象在网络上传输、磁盘读写

  1. scala> val ds = Seq(1, 2, 3).toDS()
  2. 16/08/19 21:48:39 INFO codegen.GenerateUnsafeProjection: Code generated in 114.079551 ms
  3. ds: org.apache.spark.sql.Dataset[Int] = [value: int]
  4. scala> ds.map(_ + 1).collect
  5. res0: Array[Int] = Array(2, 3, 4)
  6. scala> val df = ds.toDF()
  7. df: org.apache.spark.sql.DataFrame = [value: int]
  8. scala> case class Person(name:String, age:Long)
  9. defined class Person
  10. scala> val ds = Seq(People("Andy",32)).toDS()
  11. 16/08/19 21:56:39 INFO codegen.GenerateUnsafeProjection: Code generated in 15.188177 ms
  12. ds: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
  13. //从 DataFrame 转化为 DataSet
  14. scala> val json_df = sqlContext.read.json("people.json")
  15. json_df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  16. //需要 case class 匹配 :字段名称 字段类型 均要匹配
  17. scala> val json_ds = json_df.as[Person]
  18. json_ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

相关文章:
-1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
-2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Spark SQL & ES

用ES做二级索引

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注