@Arslan6and6
2016-10-10T06:53:54.000000Z
字数 1948
阅读 926
spark
Spark1.6.1官方演示代码:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
自制测试资源 /user/beifeng/mapTest01.txt,内容如下
AA BB CC NN EE FF GG
EE BB GG DD BB KK EE
AA ZZ XX DD EE FF GG
NN BB CC DD EE FF AA
AA BB CC DD EE EE GG
AA RR CC DD TT FF XX
YY BB VV MM EE FF GG
AA BB CC DD EE FF GG
演示代码如下:
//加载资源到 RDD
scala> val rdd = sc.textFile("/user/beifeng/mapTest01.txt")
//制作字段名称字符串
scala> val schemaString = "col1 col2 col3 col4 col5 col6 col7"
//构建匹配模式
scala> val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
//调整RDD格式
val rowRDD = rdd.map(_.split("\t")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))
//将匹配模式集成DataFrame
val case_df = sqlContext.createDataFrame(rowRDD, schema)
//将DataFrame存入HIve表
scala> case_df.registerTempTable("case_test02")
//执行HQL语句测试结果
scala> val results = sqlContext.sql("SELECT col6 FROM case_test02")
//打印测试结果
scala> results.map(t => "col6: " + t(0)).collect().foreach(println)
col6: FF
col6: KK
col6: FF
col6: FF
col6: EE
col6: FF
col6: FF
col6: FF
表明无此段名称结构化数据可以用该方式匹配,使用SQL语句。