@Arslan6and6
2016-10-10T07:00:40.000000Z
字数 11019
阅读 1251
spark
回顾
-1,对于Spark Core来说,应用程序的入口
SparkContext
-2,对于Spark SQL来说,应用程序的入口
SQLContext 子类-> HiveContext
底层是 SparkContext
新建文件夹存放spark所需 jar 包
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ mkdir inputJars
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ cp /opt/modules/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/
SparkCore:RDD 形式
上传示例文件夹中 people.txt 文本文件到HDFS默认用户目录
[beifeng@hadoop-senior resources]$ more people.txt
Michael, 29
Andy, 30
Justin, 19
bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.txt
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
制作一个匹配模式
scala> case class People (name:String, age:Int)
defined class People
把 People.txt 加载到RDD并分割
scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))
...
rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at <console>:29
scala> rdd.collect
res2: Array[People] = Array(People(Michael,29), People(Andy,30), People(Justin,19))
SparkSQL:DataFrame 形式
上传 people.json
[beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.json
...
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val df = sqlContext.read.json("/user/beifeng/people.json")
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at :29
对比
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
DF可以在加载文件时就看出字段名称及类型
读取 hive 表
scala> val hive_table_df = sqlContext.read.table("test.my_usermysql2hive")
hive_table_df: org.apache.spark.sql.DataFrame = [id: tinyint, account: string, passwd: string]
--1.json
--2.hive
--3.jdbc
--4.parquet/orc 默认
--5.text 没有用
--1.方式一
RDD[CASECALSS]
--2.方式二
自定义schema(模式)
--1.ES 检索
--2.HBase 、华为
--3.solr
...
读取 parquet 文件
[beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/users.parquet
scala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")
user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]
scala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")
scala> user_df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
默认读取文件格式就是 parquet
scala> val user_df = sqlContext.read.load("/user/beifeng/users.parquet")
user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]
把文件写入MySQL
第一步,将文件加载到 DataFrame
scala> val w2mysq_df = sqlContext.read.table("world.city")
w2mysq_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]
第二步,建立连接及声明目标 MySQL 数据库名,后缀接用户名、密码
scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"
url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?username=root&password=123
第三步,导入 Properties 。因为jdbc方法参数列表为 (url, 目标表名, properties实例)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}
scala> import java.util.Properties
import java.util.Properties
第四步,实例化Properties
scala> val prop = new Properties()
prop: java.util.Properties = {}
第五步,将 DateFrame 写入MySQL
scala> w2mysq_df.write.jdbc(url, "fromSparkSQL", prop)
java.sql.SQLException: No suitable driver
报错:找不到适合的驱动。说明 --jars 方法加载驱动不适合
解决办法:
关闭spark-shell后
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
在同一会话窗口重新开启spark-shell
bin/spark-shell --master local[2]
然后重新执行上述第一步到第五步后,可以查看到从Hive导入MySQL的表" fromSparkSQL"
mysql> show tables;
+-----------------+
| Tables_in_world |
+-----------------+
| City |
| Country |
| CountryLanguage |
| fromSparkSQL |
+-----------------+
4 rows in set (0.00 sec)
mysql> select * from fromSparkSQL limit 5;
+------+----------------+-------------+-----------+------------+
| id | name | countrycode | district | population |
+------+----------------+-------------+-----------+------------+
| 1021 | Sawangan | IDN | West Java | 91100 |
| 1 | Kabul | AFG | Kabol | 1780000 |
| 1022 | Banyuwangi | IDN | East Java | 89900 |
| 2 | Qandahar | AFG | Qandahar | 237500 |
| 1023 | Tanjung Pinang | IDN | Riau | 89900 |
+------+----------------+-------------+-----------+------------+
5 rows in set (0.00 sec)
scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world"
url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world
scala> val props = new Properties()
props: java.util.Properties = {}
scala> props.put("user","root")
res7: Object = null
scala> props.put("password","123")
res8: Object = null
scala> val mysql_city_df = sqlContext.read.jdbc(url, "City", props)
mysql_city_df: org.apache.spark.sql.DataFrame = [ID: int, Name: string, CountryCode: string, District: string, Population: int]
案例:JOIN MySQL中的 world.Country 和 hive 中的 world.city , 以 countrycode 连接
连接MySQL的 world 数据库,并附加用户名及密码
scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"
url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123
实例化化 Properites
scala> val prop = new Properties()
prop: java.util.Properties = {}
读取 MySQL中的 world.Country 到 DataFrame
scala> val mysql_Country_df = sqlContext.read.jdbc(url, "Country", prop)
mysql_Country_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string]
读取 hive 中的 world.city 到 DataFrame
scala> val hive_city_df = sqlContext.read.table("world.city")
hive_city_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]
以 mysql_country_df("Code")===hive_city_df("countrycode") 字段连接两张表
字段名不同 join() 语法 : 左DataFrame.join (右DataFrame, 左DataFrame("join字段名")===右DataFrame("join字段"))
scala> val join_df = mysql_country_df.join(hive_city_df, mysql_country_df("Code")===hive_city_df("countrycode"))
join_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string, id: int, name: string, countrycode: string, district: string, population: int]
如果 join 字段名相同
左DataFrame.join(右DataFrame, "jion字段名")
使用 DataFrame.registerTempTable ("临时表表名") 方法 将 join DataFrame 放进临时表
scala> join_df.registerTempTable("join_temp_countrycity")
然后就可以便捷查询
scala> sqlContext.sql("select Code, id from join_temp_countrycity limit 5").show
使用反射机制推断一个 Schema 模式
scala> case class People(name:String, age:Int)
scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))
scala> val people_df = rdd.toDF()
people_df.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
构建一个模式,把该模式给DataFrame
--1,RDD -> RDD[Row]
scala> import org.apache.spark.sql._
scala> val rdd = sc.textFile("/user/beifeng/people.txt")
//scala> rdd.collect
//res7: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
//rdd.map(line=>line.split(", ")).collect
//res6: Array[Array[String]] = Array(Array(Michael, 29), Array(Andy, 30), Array(Justin, 19))
scala> val rowRdd = rdd.map(line=>line.split(", ")).map(x=>Row(x(0), x(1).toInt))
//scala> rowRdd.collect
//res8: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
--2,Create the schema
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
--3,Apply the schema to the RDD of Rows
scala> val people_df = sqlContext.createDataFrame(rowRdd, schema)
people_df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> people_df.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
Spark1.6.1官方演示代码:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
DataFrame 在需要时其他
按需要时 DF 可以转化成RDD,RDD的类型变为 [Row]
scala> people_df.rdd
res5: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[8] at rdd at <console>:42
前端异步传输时需要转化为 JSON
scala> people_df.toJSON.collect
res8: Array[String] = Array({"name":"Michael","age":29}, {"name":"Andy","age":30}, {"name":"Justin","age":19})
SaveMode
https://github.com/databricks
spark-csv 读 csv 格式文本文件 比如把 csv 文件加载到 hive 表中,再从 hive 读取 。就该下载该工程并编译成 jar 包
spark-avro 是 Hadoop生态系统中一个序列化框架,属于 apache 顶级项目
http://avro.apache.org/
spark-xml 用于读取 xml 文件
learning-spark 源码书
reference-apps 课程参考案例来源于此
https://spark-packages.org/
spork
spark-jobserver 有时间一定要细看
zeppelin
spark-hbase
spark-mongodb
spark-sorted
spark-kafka
spark-notebook
使用Encode方式进行序列化对象在网络上传输、磁盘读写
scala> val ds = Seq(1, 2, 3).toDS()
16/08/19 21:48:39 INFO codegen.GenerateUnsafeProjection: Code generated in 114.079551 ms
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.map(_ + 1).collect
res0: Array[Int] = Array(2, 3, 4)
scala> val df = ds.toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> case class Person(name:String, age:Long)
defined class Person
scala> val ds = Seq(People("Andy",32)).toDS()
16/08/19 21:56:39 INFO codegen.GenerateUnsafeProjection: Code generated in 15.188177 ms
ds: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
//从 DataFrame 转化为 DataSet
scala> val json_df = sqlContext.read.json("people.json")
json_df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
//需要 case class 匹配 :字段名称 字段类型 均要匹配
scala> val json_ds = json_df.as[Person]
json_ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
相关文章:
-1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
-2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
用ES做二级索引