@Arslan6and6
2016-10-10T06:50:05.000000Z
字数 7383
阅读 1333
spark
//hive数据库default中存在表 emphive (default)> create table emp( > empno int,> ename string,> job string,> mgr int,> hiredate string,> sal double,> comm double,> deptno int> )> row format delimited fields terminated by '\t';OKTime taken: 1.248 secondshive (default)> show tables;OKtab_nameempTime taken: 0.024 seconds, Fetched: 1 row(s)
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar//查询 hive 中存在的表scala> sqlContext.sql("show tables").show+---------+-----------+|tableName|isTemporary|+---------+-----------+| emp| false|+---------+-----------+sqlContext.sql("select * from emp").show //或者 sqlContext.read.table("emp").show+-----+------+---------+----+----------+------+------+------+|empno| ename| job| mgr| hiredate| sal| comm|deptno|+-----+------+---------+----+----------+------+------+------+| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|| 7844|TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|| 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|+-----+------+---------+----+----------+------+------+------+scala> val emp_df = sqlContext.sql("select * from emp")emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]
DSL语言
//第一种写法emp_df.agg("sal"->"avg", "comm"->"max").show+-----------------+---------+| avg(sal)|max(comm)|+-----------------+---------+|2073.214285714286| 1400.0|+-----------------+---------+//第二种写法scala> emp_df.agg(Map("sal"->"avg", "comm"->"max")).show+-----------------+---------+| avg(sal)|max(comm)|+-----------------+---------+|2073.214285714286| 1400.0|+-----------------+---------+//第三种写法 df.agg(max($"age"), avg($"salary"))//即 DSL全称Domain Specific Language,中文解释为领域专用语言scala> emp_df.agg(avg($"sal"), max($"comm")).show+-----------------+---------+| avg(sal)|max(comm)|+-----------------+---------+|2073.214285714286| 1400.0|+-----------------+---------+//第四种/* df.groupBy($"department", $"gender").agg(Map(* "salary" -> "avg",* "age" -> "max"))scala> emp_df.groupBy($"deptno").agg(avg($"sal"), max($"comm")).show+------+------------------+---------+|deptno| avg(sal)|max(comm)|+------+------------------+---------+| 10|2916.6666666666665| null|| 20| 2175.0| null|| 30|1566.6666666666667| 1400.0|+------+------------------+---------+
//注册UDF自定义函数scala> sqlContext.udf.register(| "comm_trans", //函数名| (comm: Double) => { //匿名函数| if (comm.toString == "null") {| 0.0| } else {| comm| }| }| )res22: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,List(DoubleType))scala> sqlContext.sql("select empno, ename, sal, comm_trans(comm) cm from emp").show+-----+------+------+------+|empno| ename| sal| cm|+-----+------+------+------+| 7369| SMITH| 800.0| null|| 7499| ALLEN|1600.0| 300.0|| 7521| WARD|1250.0| 500.0|| 7566| JONES|2975.0| null|| 7654|MARTIN|1250.0|1400.0|| 7698| BLAKE|2850.0| null|| 7782| CLARK|2450.0| null|| 7788| SCOTT|3000.0| null|| 7839| KING|5000.0| null|| 7844|TURNER|1500.0| 0.0|| 7876| ADAMS|1100.0| null|| 7900| JAMES| 950.0| null|| 7902| FORD|3000.0| null|| 7934|MILLER|1300.0| null|+-----+------+------+------+


scala> sqlContext.sql("select deptno, avg(sal) avg_sal from emp group by deptno").show+------+------------------+|deptno| avg_sal|+------+------------------+| 10|2916.6666666666665|| 20| 2175.0|| 30|1566.6666666666667|+------+------------------+
package com.ibeifeng.bigdata.spark.app.sqlimport org.apache.spark.sql.Rowimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types._/*** Created by XuanYu on 2016/8/6.*/object AvgSalUDAF extends UserDefinedAggregateFunction{/*** 指定输入字段的名称和类型*/override def inputSchema: StructType = StructType(StructField("sal", DoubleType, true) :: Nil)/*** 更新缓冲数据的值* @param buffer* @param input*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {// 获取缓冲数据val salTotal = buffer.getDouble(0)val salCount = buffer.getInt(1)// 获取传递进来的数据val inputSal = input.getDouble(0)// 更新缓冲数据buffer.update(0, salTotal + inputSal)buffer.update(1, salCount + 1)}/*** 依据需求定义缓冲数据字段的名称和类型*/override def bufferSchema: StructType = StructType(StructField("sal_total", DoubleType, true) ::StructField("sal_count", IntegerType, true) :: Nil)/*** 从字面看,是合并* Merges two aggregation buffers* and stores the updated buffer values back to `buffer1`.** This is called when we merge two partially aggregated data together.** @param buffer1* @param buffer2*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {// 获取缓冲数据val salTotal1 = buffer1.getDouble(0)val salCount1 = buffer1.getInt(1)// 获取缓冲数据val salTotal2 = buffer2.getDouble(0)val salCount2 = buffer2.getInt(1)// 更新缓冲数据buffer1.update(0, salTotal1 + salTotal2)buffer1.update(1, salCount1 + salCount2)}/*** 对缓冲数据的字段值进行初始化* @param buffer*/override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0.0)buffer.update(1, 0)}/*** 确定唯一性*/override def deterministic: Boolean = true/**** @param buffer* @return*/override def evaluate(buffer: Row): Any = {val salTotal = buffer.getDouble(0)val salCount = buffer.getInt(1)// returnsalTotal / salCount}/*** 指定输出数据的类型*/override def dataType: DataType = DoubleType}
sqlContext.udf.register("avg_sal",AvgSalUDAF)
scala> sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show+------+------------------+------------------+|deptno| avg_sal| as_self|+------+------------------+------------------+| 10|2916.6666666666665|2916.6666666666665|| 20| 2175.0| 2175.0|| 30|1566.6666666666667|1566.6666666666667|+------+------------------+------------------+
import org.apache.spark.sql.types._Data type Value type in Scala API to access or create a data typeByteType Byte ByteTypeShortType Short ShortTypeIntegerType Int IntegerTypeLongType Long LongTypeFloatType Float FloatTypeDoubleType Double DoubleTypeDecimalType java.math.BigDecimal DecimalTypeStringType String StringTypeBinaryType Array[Byte] BinaryTypeBooleanType Boolean BooleanTypeTimestampType java.sql.Timestamp TimestampTypeDateType java.sql.Date DateTypeArrayType scala.collection.Seq ArrayType(elementType, [containsNull])Note: The default value of containsNull is true.MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])Note: The default value of valueContainsNull is true.StructType org.apache.spark.sql.Row StructType(fields)Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, nullable)
窗口函数之 ROW_NUMBER() OVER(标记条件) 标记字段
//此处ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk 的意思为给每行标记行标,标记条件为OVER()内的条件,即按照 dept 分组后按照组内的 sal 降序排列,给各个组的每行标记行号SELECTempno, ename, sal, deptno,ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnkFROMemp;empno ename sal deptno rnk7839 KING 5000.0 10 17782 CLARK 2450.0 10 27934 MILLER 1300.0 10 37788 SCOTT 3000.0 20 17902 FORD 3000.0 20 27566 JONES 2975.0 20 37876 ADAMS 1100.0 20 47369 SMITH 800.0 20 57698 BLAKE 2850.0 30 17499 ALLEN 1600.0 30 27844 TURNER 1500.0 30 37521 WARD 1250.0 30 47654 MARTIN 1250.0 30 57900 JAMES 950.0 30 6