[关闭]
@Arslan6and6 2016-10-10T06:52:03.000000Z 字数 5383 阅读 650

spark core 小记

spark


takeOrdered 对RDD _.1升序排序
top 对RDD _.1降序排序
——————————————————————————————————————
top(3)(OrderingUtils.SecondValueOrdering) 自定义规则排序,在此是对 _.2 进行降序排序

import scala.math.Ordering

object OrderingUtils {
object SecondValueOrdering extends Ordering[(String, Int)]{
override def compare(x: (String, Int), y: (String, Int)): Int = {
x._2.compare(y._2)
// x._2 compare y._2 // 1 to 10 | 1.to(10)
}
}
}
————————————————————————————————————————

rdd.partitions.length 查看RDD有几个分区
rdd.count 生成一个JOB,查看4040
一个分区对应一个task任务处理,所以可以看4040中该JOB有几个task

查看RDD的依赖性
rdd.toDebugString

distinct 在map端去重list.distinct 然后rdd.distinct在reduce端整体去重,否则会造成数据倾斜

——————————————————————————————————————————————
需求:左表为结果表,即以前累计数据,右表为新数据。在右表中查出新增数据。

leftjoin

scala> val rdd1 = sc.parallelize(List("aa","bb","cc","dd")).map((_,1))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :27

scala> val rdd2 = sc.parallelize(List("cc","dd","ee","ff")).map((_,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at :27

scala> val rdd = rdd2.leftOuterJoin(rdd1)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[8] at leftOuterJoin at :31

scala> rdd.collect
res0: Array[(String, (Int, Option[Int]))] = Array((ee,(1,None)), (dd,(1,Some(1))), (ff,(1,None)), (cc,(1,Some(1))))

按左表key为范围,取2表都存在的数据
scala> rdd.filter(tuple => !tuple._2._2.isEmpty).collect
res1: Array[(String, (Int, Option[Int]))] = Array((dd,(1,Some(1))), (cc,(1,Some(1))))
和等值join效果一样
scala> val rdd3 = rdd2.join(rdd1)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[13] at join at :31

scala> val rdd3 = rdd2.join(rdd1).collect
rdd3: Array[(String, (Int, Int))] = Array((dd,(1,1)), (cc,(1,1)))

得到需求:查出新数据,即_2._2为对应不到累计结果表左表的key
按左表key为范围,取左表key存在,右表key不存在的连接数据
scala> rdd.filter(tuple => tuple._2._2.isEmpty).collect
res2: Array[(String, (Int, Option[Int]))] = Array((ee,(1,None)), (ff,(1,None)))


repartition(大于2的分区数量) 重新分区

coalesce 后面搞懂


countByKey 对key出现的次数进行count 。使用对象是map元组对,返回map元组对
同样可以实现词频统计
rdd.flatMap(.split("\t")).map((,1)).countByKey

————————————————————————————————————————————————
需求 分组,再取最大几个值,并从大到小排列

group top key

先查看输入RDD格式
scala> rdd.collect
res20: Array[String] = Array(a 78, bb 98, aa 80, cc 98, aa 69, cc 87, bb 97, cc 86, aa 97, bb 78, bb 34, cc 85, bb 92, cc 72, bb 32, bb 23)

scala> rdd.map(_.split(" ")).collect
res2: Array[Array[String]] = Array(Array(a, 78), Array(bb, 98), Array(aa, 80), Array(cc, 98), Array(aa, 69), Array(cc, 87), Array(bb, 97), Array(cc, 86), Array(aa, 97), Array(bb, 78), Array(bb, 34), Array(cc, 85), Array(bb, 92), Array(cc, 72), Array(bb, 32), Array(bb, 23))

.2 转换成Int
scala> rdd.map(
.split(" ")).map(arr=>(arr(0),arr(1).toInt)).collect
res6: Array[(String, Int)] = Array((a,78), (bb,98), (aa,80), (cc,98), (aa,69), (cc,87), (bb,97), (cc,86), (aa,97), (bb,78), (bb,34), (cc,85), (bb,92), (cc,72), (bb,32), (bb,23))

查看groupByKey输出
scala> rdd.map(_.split(" ")).map(arr=>(arr(0),arr(1).toInt)).groupByKey().collect
res8: Array[(String, Iterable[Int])] = Array((aa,CompactBuffer(80, 69, 97)), (bb,CompactBuffer(98, 97, 78, 34, 92, 32, 23)), (cc,CompactBuffer(98, 87, 86, 85, 72)), (a,CompactBuffer(78)))

从小到大,全排列
scala> rdd.map(_.split(" ")).map(arr=>(arr(0),arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted)).collect
res10: Array[(String, List[Int])] = Array((aa,List(69, 80, 97)), (bb,List(23, 32, 34, 78, 92, 97, 98)), (cc,List(72, 85, 86, 87, 98)), (a,List(78)))

从大到小,取最大3个
scala> rdd.map(_.split(" ")).map(arr=>(arr(0),arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3))).collect
res18: Array[(String, List[Int])] = Array((aa,List(69, 80, 97)), (bb,List(92, 97, 98)), (cc,List(86, 87, 98)), (a,List(78)))

从大到小,取最大3个,加reverse,再从大到小排列
scala> rdd.map(_.split(" ")).map(arr=>(arr(0),arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse)).collect
res24: Array[(String, List[Int])] = Array((aa,List(97, 80, 69)), (bb,List(98, 97, 92)), (cc,List(98, 87, 86)), (a,List(78)))

从小到大,取最小3个
scala> rdd.map(_.split(" ")).map(arr=>(arr(0),arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.take(3))).collect
res19: Array[(String, List[Int])] = Array((aa,List(69, 80, 97)), (bb,List(23, 32, 34)), (cc,List(72, 85, 86)), (a,List(78)))

————————————————————————————————————————————
driver 1.向master或applicationmaser 申请资源,以运行executor 2.RDD的入口sparkcontext RDD 3.调度job

广播变量
val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value

累加器 项目会将自定义累加器
val accum = sc.accumulator(0, "My accumulator")
sc.parallelize(Array(1,2,3,4,5)).foreach( x => accum += x)
accum.value

生产环境下是cluster模式,我们使用是在client模式,repl中
--deploy-mode cluster 结果输出在driver端,即只能在集群里看到

————————————————————————————————————————————————
第三种方式 添加外部jar包 可以写脚本提交

!/bin/sh

SPARK_HOME

SPARK_HOME=/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6

SPARK CLASSPATH

SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

————————————————————————————————————————————————
运行在yarn上,与master和slave无关,关闭,需要启动 start-yarn

client模式中AM和driver是分离的

jps
7160 ResourceManager
6722 HistoryServer
7275 NodeManager
8270 SparkSubmit 就是driver program
4801 Launcher
8871 Jps
3396 RemoteMavenServer
8625 CoarseGrainedExecutorBackend Executor进程
2857 DataNode
7672 NameNode
8555 ExecutorLauncher 就是applicationMater
4278 NailgunRunner
3274 Main

cluster模式中AM和driver合并了
bin/spark-submit --master yarn --deploy-mode cluster --class 主类名 jar包名
bin/spark-submit --master yarn --deploy-mode cluster --class com.ibeifeng.bigdata.spark.app.core.LogAnalyzer /home/beifeng/log_analyzer_jar/log-analyzer.jar

sparkConf.set("","") key-value 塞参数

——————————————————————————————————————————————————

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注