[关闭]
@Arslan6and6 2016-10-10T06:50:05.000000Z 字数 7383 阅读 1192

sparkSQL

spark


一.sparkSQL中的聚合函数

  1. //hive数据库default中存在表 emp
  2. hive (default)> create table emp( > empno int,
  3. > ename string,
  4. > job string,
  5. > mgr int,
  6. > hiredate string,
  7. > sal double,
  8. > comm double,
  9. > deptno int
  10. > )
  11. > row format delimited fields terminated by '\t';
  12. OK
  13. Time taken: 1.248 seconds
  14. hive (default)> show tables;
  15. OK
  16. tab_name
  17. emp
  18. Time taken: 0.024 seconds, Fetched: 1 row(s)
  1. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
  2. //查询 hive 中存在的表
  3. scala> sqlContext.sql("show tables").show
  4. +---------+-----------+
  5. |tableName|isTemporary|
  6. +---------+-----------+
  7. | emp| false|
  8. +---------+-----------+
  9. sqlContext.sql("select * from emp").show //或者 sqlContext.read.table("emp").show
  10. +-----+------+---------+----+----------+------+------+------+
  11. |empno| ename| job| mgr| hiredate| sal| comm|deptno|
  12. +-----+------+---------+----+----------+------+------+------+
  13. | 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|
  14. | 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|
  15. | 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|
  16. | 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|
  17. | 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|
  18. | 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|
  19. | 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|
  20. | 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|
  21. | 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|
  22. | 7844|TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|
  23. | 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|
  24. | 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|
  25. | 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|
  26. | 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|
  27. +-----+------+---------+----+----------+------+------+------+
  28. scala> val emp_df = sqlContext.sql("select * from emp")
  29. emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]

DSL语言

1.object DataFrame 中的 agg 聚合函数

  1. //第一种写法
  2. emp_df.agg("sal"->"avg", "comm"->"max").show
  3. +-----------------+---------+
  4. | avg(sal)|max(comm)|
  5. +-----------------+---------+
  6. |2073.214285714286| 1400.0|
  7. +-----------------+---------+
  8. //第二种写法
  9. scala> emp_df.agg(Map("sal"->"avg", "comm"->"max")).show
  10. +-----------------+---------+
  11. | avg(sal)|max(comm)|
  12. +-----------------+---------+
  13. |2073.214285714286| 1400.0|
  14. +-----------------+---------+
  15. //第三种写法 df.agg(max($"age"), avg($"salary"))
  16. //即 DSL全称Domain Specific Language,中文解释为领域专用语言
  17. scala> emp_df.agg(avg($"sal"), max($"comm")).show
  18. +-----------------+---------+
  19. | avg(sal)|max(comm)|
  20. +-----------------+---------+
  21. |2073.214285714286| 1400.0|
  22. +-----------------+---------+
  23. //第四种
  24. /* df.groupBy($"department", $"gender").agg(Map(
  25. * "salary" -> "avg",
  26. * "age" -> "max"))
  27. scala> emp_df.groupBy($"deptno").agg(avg($"sal"), max($"comm")).show
  28. +------+------------------+---------+
  29. |deptno| avg(sal)|max(comm)|
  30. +------+------------------+---------+
  31. | 10|2916.6666666666665| null|
  32. | 20| 2175.0| null|
  33. | 30|1566.6666666666667| 1400.0|
  34. +------+------------------+---------+

二、自定义函数

  1. //注册UDF自定义函数
  2. scala> sqlContext.udf.register(
  3. | "comm_trans", //函数名
  4. | (comm: Double) => { //匿名函数
  5. | if (comm.toString == "null") {
  6. | 0.0
  7. | } else {
  8. | comm
  9. | }
  10. | }
  11. | )
  12. res22: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,List(DoubleType))
  13. scala> sqlContext.sql("select empno, ename, sal, comm_trans(comm) cm from emp").show
  14. +-----+------+------+------+
  15. |empno| ename| sal| cm|
  16. +-----+------+------+------+
  17. | 7369| SMITH| 800.0| null|
  18. | 7499| ALLEN|1600.0| 300.0|
  19. | 7521| WARD|1250.0| 500.0|
  20. | 7566| JONES|2975.0| null|
  21. | 7654|MARTIN|1250.0|1400.0|
  22. | 7698| BLAKE|2850.0| null|
  23. | 7782| CLARK|2450.0| null|
  24. | 7788| SCOTT|3000.0| null|
  25. | 7839| KING|5000.0| null|
  26. | 7844|TURNER|1500.0| 0.0|
  27. | 7876| ADAMS|1100.0| null|
  28. | 7900| JAMES| 950.0| null|
  29. | 7902| FORD|3000.0| null|
  30. | 7934|MILLER|1300.0| null|
  31. +-----+------+------+------+

image_1arqrs333iok1jp71unob06b89.png-85.7kB

image_1arqs2rpt1dnik8k107cih41lngm.png-82.9kB

  1. scala> sqlContext.sql("select deptno, avg(sal) avg_sal from emp group by deptno").show
  2. +------+------------------+
  3. |deptno| avg_sal|
  4. +------+------------------+
  5. | 10|2916.6666666666665|
  6. | 20| 2175.0|
  7. | 30|1566.6666666666667|
  8. +------+------------------+
  1. package com.ibeifeng.bigdata.spark.app.sql
  2. import org.apache.spark.sql.Row
  3. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  4. import org.apache.spark.sql.types._
  5. /**
  6. * Created by XuanYu on 2016/8/6.
  7. */
  8. object AvgSalUDAF extends UserDefinedAggregateFunction{
  9. /**
  10. * 指定输入字段的名称和类型
  11. */
  12. override def inputSchema: StructType = StructType(
  13. StructField("sal", DoubleType, true) :: Nil
  14. )
  15. /**
  16. * 更新缓冲数据的值
  17. * @param buffer
  18. * @param input
  19. */
  20. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  21. // 获取缓冲数据
  22. val salTotal = buffer.getDouble(0)
  23. val salCount = buffer.getInt(1)
  24. // 获取传递进来的数据
  25. val inputSal = input.getDouble(0)
  26. // 更新缓冲数据
  27. buffer.update(0, salTotal + inputSal)
  28. buffer.update(1, salCount + 1)
  29. }
  30. /**
  31. * 依据需求定义缓冲数据字段的名称和类型
  32. */
  33. override def bufferSchema: StructType = StructType(
  34. StructField("sal_total", DoubleType, true) ::
  35. StructField("sal_count", IntegerType, true) :: Nil
  36. )
  37. /**
  38. * 从字面看,是合并
  39. * Merges two aggregation buffers
  40. * and stores the updated buffer values back to `buffer1`.
  41. *
  42. * This is called when we merge two partially aggregated data together.
  43. *
  44. * @param buffer1
  45. * @param buffer2
  46. */
  47. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  48. // 获取缓冲数据
  49. val salTotal1 = buffer1.getDouble(0)
  50. val salCount1 = buffer1.getInt(1)
  51. // 获取缓冲数据
  52. val salTotal2 = buffer2.getDouble(0)
  53. val salCount2 = buffer2.getInt(1)
  54. // 更新缓冲数据
  55. buffer1.update(0, salTotal1 + salTotal2)
  56. buffer1.update(1, salCount1 + salCount2)
  57. }
  58. /**
  59. * 对缓冲数据的字段值进行初始化
  60. * @param buffer
  61. */
  62. override def initialize(buffer: MutableAggregationBuffer): Unit = {
  63. buffer.update(0, 0.0)
  64. buffer.update(1, 0)
  65. }
  66. /**
  67. * 确定唯一性
  68. */
  69. override def deterministic: Boolean = true
  70. /**
  71. *
  72. * @param buffer
  73. * @return
  74. */
  75. override def evaluate(buffer: Row): Any = {
  76. val salTotal = buffer.getDouble(0)
  77. val salCount = buffer.getInt(1)
  78. // return
  79. salTotal / salCount
  80. }
  81. /**
  82. * 指定输出数据的类型
  83. */
  84. override def dataType: DataType = DoubleType
  85. }
  1. sqlContext.udf.register(
  2. "avg_sal",
  3. AvgSalUDAF
  4. )
  1. scala> sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show
  2. +------+------------------+------------------+
  3. |deptno| avg_sal| as_self|
  4. +------+------------------+------------------+
  5. | 10|2916.6666666666665|2916.6666666666665|
  6. | 20| 2175.0| 2175.0|
  7. | 30|1566.6666666666667|1566.6666666666667|
  8. +------+------------------+------------------+
  1. import org.apache.spark.sql.types._
  2. Data type Value type in Scala API to access or create a data type
  3. ByteType Byte ByteType
  4. ShortType Short ShortType
  5. IntegerType Int IntegerType
  6. LongType Long LongType
  7. FloatType Float FloatType
  8. DoubleType Double DoubleType
  9. DecimalType java.math.BigDecimal DecimalType
  10. StringType String StringType
  11. BinaryType Array[Byte] BinaryType
  12. BooleanType Boolean BooleanType
  13. TimestampType java.sql.Timestamp TimestampType
  14. DateType java.sql.Date DateType
  15. ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
  16. Note: The default value of containsNull is true.
  17. MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
  18. Note: The default value of valueContainsNull is true.
  19. StructType org.apache.spark.sql.Row StructType(fields)
  20. Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
  21. StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, nullable)

窗口函数之 ROW_NUMBER() OVER(标记条件) 标记字段

  1. //此处ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk 的意思为给每行标记行标,标记条件为OVER()内的条件,即按照 dept 分组后按照组内的 sal 降序排列,给各个组的每行标记行号
  2. SELECT
  3. empno, ename, sal, deptno,
  4. ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
  5. FROM
  6. emp;
  7. empno ename sal deptno rnk
  8. 7839 KING 5000.0 10 1
  9. 7782 CLARK 2450.0 10 2
  10. 7934 MILLER 1300.0 10 3
  11. 7788 SCOTT 3000.0 20 1
  12. 7902 FORD 3000.0 20 2
  13. 7566 JONES 2975.0 20 3
  14. 7876 ADAMS 1100.0 20 4
  15. 7369 SMITH 800.0 20 5
  16. 7698 BLAKE 2850.0 30 1
  17. 7499 ALLEN 1600.0 30 2
  18. 7844 TURNER 1500.0 30 3
  19. 7521 WARD 1250.0 30 4
  20. 7654 MARTIN 1250.0 30 5
  21. 7900 JAMES 950.0 30 6
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注