@Arslan6and6
2016-10-10T06:50:05.000000Z
字数 7383
阅读 1192
spark
//hive数据库default中存在表 emp
hive (default)> create table emp( > empno int,
> ename string,
> job string,
> mgr int,
> hiredate string,
> sal double,
> comm double,
> deptno int
> )
> row format delimited fields terminated by '\t';
OK
Time taken: 1.248 seconds
hive (default)> show tables;
OK
tab_name
emp
Time taken: 0.024 seconds, Fetched: 1 row(s)
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
//查询 hive 中存在的表
scala> sqlContext.sql("show tables").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| emp| false|
+---------+-----------+
sqlContext.sql("select * from emp").show //或者 sqlContext.read.table("emp").show
+-----+------+---------+----+----------+------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|
| 7844|TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|
| 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|
+-----+------+---------+----+----------+------+------+------+
scala> val emp_df = sqlContext.sql("select * from emp")
emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]
DSL语言
//第一种写法
emp_df.agg("sal"->"avg", "comm"->"max").show
+-----------------+---------+
| avg(sal)|max(comm)|
+-----------------+---------+
|2073.214285714286| 1400.0|
+-----------------+---------+
//第二种写法
scala> emp_df.agg(Map("sal"->"avg", "comm"->"max")).show
+-----------------+---------+
| avg(sal)|max(comm)|
+-----------------+---------+
|2073.214285714286| 1400.0|
+-----------------+---------+
//第三种写法 df.agg(max($"age"), avg($"salary"))
//即 DSL全称Domain Specific Language,中文解释为领域专用语言
scala> emp_df.agg(avg($"sal"), max($"comm")).show
+-----------------+---------+
| avg(sal)|max(comm)|
+-----------------+---------+
|2073.214285714286| 1400.0|
+-----------------+---------+
//第四种
/* df.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"))
scala> emp_df.groupBy($"deptno").agg(avg($"sal"), max($"comm")).show
+------+------------------+---------+
|deptno| avg(sal)|max(comm)|
+------+------------------+---------+
| 10|2916.6666666666665| null|
| 20| 2175.0| null|
| 30|1566.6666666666667| 1400.0|
+------+------------------+---------+
//注册UDF自定义函数
scala> sqlContext.udf.register(
| "comm_trans", //函数名
| (comm: Double) => { //匿名函数
| if (comm.toString == "null") {
| 0.0
| } else {
| comm
| }
| }
| )
res22: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,List(DoubleType))
scala> sqlContext.sql("select empno, ename, sal, comm_trans(comm) cm from emp").show
+-----+------+------+------+
|empno| ename| sal| cm|
+-----+------+------+------+
| 7369| SMITH| 800.0| null|
| 7499| ALLEN|1600.0| 300.0|
| 7521| WARD|1250.0| 500.0|
| 7566| JONES|2975.0| null|
| 7654|MARTIN|1250.0|1400.0|
| 7698| BLAKE|2850.0| null|
| 7782| CLARK|2450.0| null|
| 7788| SCOTT|3000.0| null|
| 7839| KING|5000.0| null|
| 7844|TURNER|1500.0| 0.0|
| 7876| ADAMS|1100.0| null|
| 7900| JAMES| 950.0| null|
| 7902| FORD|3000.0| null|
| 7934|MILLER|1300.0| null|
+-----+------+------+------+
scala> sqlContext.sql("select deptno, avg(sal) avg_sal from emp group by deptno").show
+------+------------------+
|deptno| avg_sal|
+------+------------------+
| 10|2916.6666666666665|
| 20| 2175.0|
| 30|1566.6666666666667|
+------+------------------+
package com.ibeifeng.bigdata.spark.app.sql
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* Created by XuanYu on 2016/8/6.
*/
object AvgSalUDAF extends UserDefinedAggregateFunction{
/**
* 指定输入字段的名称和类型
*/
override def inputSchema: StructType = StructType(
StructField("sal", DoubleType, true) :: Nil
)
/**
* 更新缓冲数据的值
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 获取缓冲数据
val salTotal = buffer.getDouble(0)
val salCount = buffer.getInt(1)
// 获取传递进来的数据
val inputSal = input.getDouble(0)
// 更新缓冲数据
buffer.update(0, salTotal + inputSal)
buffer.update(1, salCount + 1)
}
/**
* 依据需求定义缓冲数据字段的名称和类型
*/
override def bufferSchema: StructType = StructType(
StructField("sal_total", DoubleType, true) ::
StructField("sal_count", IntegerType, true) :: Nil
)
/**
* 从字面看,是合并
* Merges two aggregation buffers
* and stores the updated buffer values back to `buffer1`.
*
* This is called when we merge two partially aggregated data together.
*
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// 获取缓冲数据
val salTotal1 = buffer1.getDouble(0)
val salCount1 = buffer1.getInt(1)
// 获取缓冲数据
val salTotal2 = buffer2.getDouble(0)
val salCount2 = buffer2.getInt(1)
// 更新缓冲数据
buffer1.update(0, salTotal1 + salTotal2)
buffer1.update(1, salCount1 + salCount2)
}
/**
* 对缓冲数据的字段值进行初始化
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0.0)
buffer.update(1, 0)
}
/**
* 确定唯一性
*/
override def deterministic: Boolean = true
/**
*
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = {
val salTotal = buffer.getDouble(0)
val salCount = buffer.getInt(1)
// return
salTotal / salCount
}
/**
* 指定输出数据的类型
*/
override def dataType: DataType = DoubleType
}
sqlContext.udf.register(
"avg_sal",
AvgSalUDAF
)
scala> sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show
+------+------------------+------------------+
|deptno| avg_sal| as_self|
+------+------------------+------------------+
| 10|2916.6666666666665|2916.6666666666665|
| 20| 2175.0| 2175.0|
| 30|1566.6666666666667|1566.6666666666667|
+------+------------------+------------------+
import org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.
StructType org.apache.spark.sql.Row StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, nullable)
窗口函数之 ROW_NUMBER() OVER(标记条件) 标记字段
//此处ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk 的意思为给每行标记行标,标记条件为OVER()内的条件,即按照 dept 分组后按照组内的 sal 降序排列,给各个组的每行标记行号
SELECT
empno, ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
FROM
emp;
empno ename sal deptno rnk
7839 KING 5000.0 10 1
7782 CLARK 2450.0 10 2
7934 MILLER 1300.0 10 3
7788 SCOTT 3000.0 20 1
7902 FORD 3000.0 20 2
7566 JONES 2975.0 20 3
7876 ADAMS 1100.0 20 4
7369 SMITH 800.0 20 5
7698 BLAKE 2850.0 30 1
7499 ALLEN 1600.0 30 2
7844 TURNER 1500.0 30 3
7521 WARD 1250.0 30 4
7654 MARTIN 1250.0 30 5
7900 JAMES 950.0 30 6