@Arslan6and6
2016-10-10T06:53:54.000000Z
字数 1948
阅读 1003
spark
Spark1.6.1官方演示代码:
// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDDval people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Import Row.import org.apache.spark.sql.Row;// Import Spark SQL data typesimport org.apache.spark.sql.types.{StructType,StructField,StringType};// Generate the schema based on the string of schemaval schema =StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by field index or by field name.results.map(t => "Name: " + t(0)).collect().foreach(println)
自制测试资源 /user/beifeng/mapTest01.txt,内容如下
AA BB CC NN EE FF GGEE BB GG DD BB KK EEAA ZZ XX DD EE FF GGNN BB CC DD EE FF AAAA BB CC DD EE EE GGAA RR CC DD TT FF XXYY BB VV MM EE FF GGAA BB CC DD EE FF GG
演示代码如下:
//加载资源到 RDDscala> val rdd = sc.textFile("/user/beifeng/mapTest01.txt")//制作字段名称字符串scala> val schemaString = "col1 col2 col3 col4 col5 col6 col7"//构建匹配模式scala> val schema =StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))//调整RDD格式val rowRDD = rdd.map(_.split("\t")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))//将匹配模式集成DataFrameval case_df = sqlContext.createDataFrame(rowRDD, schema)//将DataFrame存入HIve表scala> case_df.registerTempTable("case_test02")//执行HQL语句测试结果scala> val results = sqlContext.sql("SELECT col6 FROM case_test02")//打印测试结果scala> results.map(t => "col6: " + t(0)).collect().foreach(println)col6: FFcol6: KKcol6: FFcol6: FFcol6: EEcol6: FFcol6: FFcol6: FF
表明无此段名称结构化数据可以用该方式匹配,使用SQL语句。