@Arslan6and6
2016-10-10T07:00:40.000000Z
字数 11019
阅读 1326
spark
回顾
-1,对于Spark Core来说,应用程序的入口
SparkContext
-2,对于Spark SQL来说,应用程序的入口
SQLContext 子类-> HiveContext
底层是 SparkContext
新建文件夹存放spark所需 jar 包[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ mkdir inputJars[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ cp /opt/modules/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/
SparkCore:RDD 形式
上传示例文件夹中 people.txt 文本文件到HDFS默认用户目录
[beifeng@hadoop-senior resources]$ more people.txtMichael, 29Andy, 30Justin, 19bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.txt[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar制作一个匹配模式scala> case class People (name:String, age:Int)defined class People把 People.txt 加载到RDD并分割scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))...rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at <console>:29scala> rdd.collectres2: Array[People] = Array(People(Michael,29), People(Andy,30), People(Justin,19))
SparkSQL:DataFrame 形式
上传 people.json[beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/people.json...df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> val df = sqlContext.read.json("/user/beifeng/people.json")scala> df.show+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+
rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[7] at map at :29
对比
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
DF可以在加载文件时就看出字段名称及类型
读取 hive 表
scala> val hive_table_df = sqlContext.read.table("test.my_usermysql2hive")hive_table_df: org.apache.spark.sql.DataFrame = [id: tinyint, account: string, passwd: string]
--1.json
--2.hive
--3.jdbc
--4.parquet/orc 默认
--5.text 没有用
--1.方式一
RDD[CASECALSS]
--2.方式二
自定义schema(模式)
--1.ES 检索
--2.HBase 、华为
--3.solr
...
读取 parquet 文件
[beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources/users.parquetscala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]scala> val user_df = sqlContext.read.parquet("/user/beifeng/users.parquet")scala> user_df.show+------+--------------+----------------+| name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+默认读取文件格式就是 parquetscala> val user_df = sqlContext.read.load("/user/beifeng/users.parquet")user_df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>]
把文件写入MySQL
第一步,将文件加载到 DataFramescala> val w2mysq_df = sqlContext.read.table("world.city")w2mysq_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]第二步,建立连接及声明目标 MySQL 数据库名,后缀接用户名、密码scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?username=root&password=123第三步,导入 Properties 。因为jdbc方法参数列表为 (url, 目标表名, properties实例)def jdbc(url: String, table: String, properties: Properties): DataFrame = {jdbc(url, table, JDBCRelation.columnPartition(null), properties)}scala> import java.util.Propertiesimport java.util.Properties第四步,实例化Propertiesscala> val prop = new Properties()prop: java.util.Properties = {}第五步,将 DateFrame 写入MySQLscala> w2mysq_df.write.jdbc(url, "fromSparkSQL", prop)java.sql.SQLException: No suitable driver报错:找不到适合的驱动。说明 --jars 方法加载驱动不适合解决办法:关闭spark-shell后[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar在同一会话窗口重新开启spark-shellbin/spark-shell --master local[2]然后重新执行上述第一步到第五步后,可以查看到从Hive导入MySQL的表" fromSparkSQL"mysql> show tables;+-----------------+| Tables_in_world |+-----------------+| City || Country || CountryLanguage || fromSparkSQL |+-----------------+4 rows in set (0.00 sec)mysql> select * from fromSparkSQL limit 5;+------+----------------+-------------+-----------+------------+| id | name | countrycode | district | population |+------+----------------+-------------+-----------+------------+| 1021 | Sawangan | IDN | West Java | 91100 || 1 | Kabul | AFG | Kabol | 1780000 || 1022 | Banyuwangi | IDN | East Java | 89900 || 2 | Qandahar | AFG | Qandahar | 237500 || 1023 | Tanjung Pinang | IDN | Riau | 89900 |+------+----------------+-------------+-----------+------------+5 rows in set (0.00 sec)
scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world"url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/worldscala> val props = new Properties()props: java.util.Properties = {}scala> props.put("user","root")res7: Object = nullscala> props.put("password","123")res8: Object = nullscala> val mysql_city_df = sqlContext.read.jdbc(url, "City", props)mysql_city_df: org.apache.spark.sql.DataFrame = [ID: int, Name: string, CountryCode: string, District: string, Population: int]
案例:JOIN MySQL中的 world.Country 和 hive 中的 world.city , 以 countrycode 连接
连接MySQL的 world 数据库,并附加用户名及密码scala> val url = "jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123"url: String = jdbc:mysql://hadoop-senior.ibeifeng.com:3306/world?user=root&password=123实例化化 Properitesscala> val prop = new Properties()prop: java.util.Properties = {}读取 MySQL中的 world.Country 到 DataFramescala> val mysql_Country_df = sqlContext.read.jdbc(url, "Country", prop)mysql_Country_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string]读取 hive 中的 world.city 到 DataFramescala> val hive_city_df = sqlContext.read.table("world.city")hive_city_df: org.apache.spark.sql.DataFrame = [id: int, name: string, countrycode: string, district: string, population: int]以 mysql_country_df("Code")===hive_city_df("countrycode") 字段连接两张表字段名不同 join() 语法 : 左DataFrame.join (右DataFrame, 左DataFrame("join字段名")===右DataFrame("join字段"))scala> val join_df = mysql_country_df.join(hive_city_df, mysql_country_df("Code")===hive_city_df("countrycode"))join_df: org.apache.spark.sql.DataFrame = [Code: string, Name: string, Continent: string, Region: string, SurfaceArea: double, IndepYear: int, Population: int, LifeExpectancy: double, GNP: double, GNPOld: double, LocalName: string, GovernmentForm: string, HeadOfState: string, Capital: int, Code2: string, id: int, name: string, countrycode: string, district: string, population: int]如果 join 字段名相同左DataFrame.join(右DataFrame, "jion字段名")使用 DataFrame.registerTempTable ("临时表表名") 方法 将 join DataFrame 放进临时表scala> join_df.registerTempTable("join_temp_countrycity")然后就可以便捷查询scala> sqlContext.sql("select Code, id from join_temp_countrycity limit 5").show

使用反射机制推断一个 Schema 模式
scala> case class People(name:String, age:Int)scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line=>line.split(",")).map(x => People(x(0),x(1).trim.toInt))scala> val people_df = rdd.toDF()people_df.show+-------+---+| name|age|+-------+---+|Michael| 29|| Andy| 30|| Justin| 19|+-------+---+
构建一个模式,把该模式给DataFrame
--1,RDD -> RDD[Row]scala> import org.apache.spark.sql._scala> val rdd = sc.textFile("/user/beifeng/people.txt")//scala> rdd.collect//res7: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)//rdd.map(line=>line.split(", ")).collect//res6: Array[Array[String]] = Array(Array(Michael, 29), Array(Andy, 30), Array(Justin, 19))scala> val rowRdd = rdd.map(line=>line.split(", ")).map(x=>Row(x(0), x(1).toInt))//scala> rowRdd.collect//res8: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])--2,Create the schemascala> import org.apache.spark.sql.types._import org.apache.spark.sql.types._scala> val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))--3,Apply the schema to the RDD of Rowsscala> val people_df = sqlContext.createDataFrame(rowRdd, schema)people_df: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> people_df.show+-------+---+| name|age|+-------+---+|Michael| 29|| Andy| 30|| Justin| 19|+-------+---+Spark1.6.1官方演示代码:// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDDval people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Import Row.import org.apache.spark.sql.Row;// Import Spark SQL data typesimport org.apache.spark.sql.types.{StructType,StructField,StringType};// Generate the schema based on the string of schemaval schema =StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by field index or by field name.results.map(t => "Name: " + t(0)).collect().foreach(println)
DataFrame 在需要时其他
按需要时 DF 可以转化成RDD,RDD的类型变为 [Row]scala> people_df.rddres5: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[8] at rdd at <console>:42前端异步传输时需要转化为 JSONscala> people_df.toJSON.collectres8: Array[String] = Array({"name":"Michael","age":29}, {"name":"Andy","age":30}, {"name":"Justin","age":19})
SaveMode
https://github.com/databricksspark-csv 读 csv 格式文本文件 比如把 csv 文件加载到 hive 表中,再从 hive 读取 。就该下载该工程并编译成 jar 包spark-avro 是 Hadoop生态系统中一个序列化框架,属于 apache 顶级项目http://avro.apache.org/spark-xml 用于读取 xml 文件learning-spark 源码书reference-apps 课程参考案例来源于此
https://spark-packages.org/sporkspark-jobserver 有时间一定要细看zeppelinspark-hbasespark-mongodbspark-sortedspark-kafkaspark-notebook


使用Encode方式进行序列化对象在网络上传输、磁盘读写
scala> val ds = Seq(1, 2, 3).toDS()16/08/19 21:48:39 INFO codegen.GenerateUnsafeProjection: Code generated in 114.079551 msds: org.apache.spark.sql.Dataset[Int] = [value: int]scala> ds.map(_ + 1).collectres0: Array[Int] = Array(2, 3, 4)scala> val df = ds.toDF()df: org.apache.spark.sql.DataFrame = [value: int]scala> case class Person(name:String, age:Long)defined class Personscala> val ds = Seq(People("Andy",32)).toDS()16/08/19 21:56:39 INFO codegen.GenerateUnsafeProjection: Code generated in 15.188177 msds: org.apache.spark.sql.Dataset[People] = [name: string, age: int]//从 DataFrame 转化为 DataSetscala> val json_df = sqlContext.read.json("people.json")json_df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]//需要 case class 匹配 :字段名称 字段类型 均要匹配scala> val json_ds = json_df.as[Person]json_ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
相关文章:
-1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
-2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
用ES做二级索引