[关闭]
@yanglfyangl 2018-07-23T06:09:44.000000Z 字数 15792 阅读 481

大数据Spark调用一例

  1. package com.huawei.bigdata.spark.rank;
  2. import com.huawei.hadoop.security.LoginUtil;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.broadcast.Broadcast;
  7. import org.apache.spark.sql.Dataset;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SQLContext;
  10. import org.apache.spark.sql.api.java.UDF2;
  11. import org.apache.spark.sql.api.java.UDF3;
  12. import org.apache.spark.sql.types.DataTypes;
  13. import org.apache.spark.sql.SparkSession;
  14. import java.io.InputStream;
  15. import java.math.BigDecimal;
  16. import java.text.SimpleDateFormat;
  17. import java.util.*;
  18. import com.mongodb.spark.MongoSpark;
  19. public class CircleCrowdRank {
  20. public static void main(String[] args) throws Exception {
  21. Properties props = new Properties();
  22. //判断是本地还是集群,本地local,集群huawei
  23. String master=args[0];
  24. SparkSession spark =null;
  25. Map<String, String> options = new HashMap<String,String>();
  26. String bootstrapServers="";
  27. if("local".equals(master)){
  28. System.setProperty("hadoop.home.dir", "D:\\software\\hadoop_home_bin");
  29. InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/localjdbc.properties");
  30. props.load(resourceAsStream);
  31. System.out.println("设置集群运行master为local环境");
  32. spark = SparkSession.builder()
  33. .master("local")
  34. .appName("userRank")
  35. .config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri"))
  36. .config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database"))
  37. .config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection"))
  38. .config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri"))
  39. .config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database"))
  40. .config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection"))
  41. .getOrCreate();
  42. //连接mysql数据库的参数信息
  43. options.put("url", props.getProperty("url"));
  44. options.put("driver", props.getProperty("driver"));
  45. options.put("user", props.getProperty("user"));
  46. options.put("password", props.getProperty("password"));
  47. }
  48. if("huawei".equals(master)){
  49. InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/huaweijdbc.properties");
  50. props.load(resourceAsStream);
  51. //设置用户名,和该用户的认证凭证/opt/userauthor/panpan_spark_ren
  52. String userPrincipal = props.getProperty("userPrincipal");
  53. String userKeytabPath = props.getProperty("userKeytabPath");
  54. String krb5ConfPath = props.getProperty("krb5ConfPath");
  55. Configuration hadoopConf = new Configuration();
  56. LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
  57. spark = SparkSession.builder()
  58. .appName("userRank")
  59. .config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri"))
  60. .config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database"))
  61. .config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection"))
  62. .config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri"))
  63. .config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database"))
  64. .config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection"))
  65. .getOrCreate();
  66. //连接mysql数据库的参数信息
  67. options.put("url", props.getProperty("url"));
  68. options.put("driver", props.getProperty("driver"));
  69. options.put("user", props.getProperty("user"));
  70. options.put("password", props.getProperty("password"));
  71. }
  72. JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
  73. //sc.setLogLevel("WARN");
  74. SQLContext sqlContext = new SQLContext(sc);
  75. //定义一个udf函数,判断是否为本user创建,如果是本user创建的,则活跃度增加100,为了实现排序,默认创建的排在首位
  76. sqlContext.udf().register("updateActiveRate", new UDF3<Long,Long,Double,Double>() {
  77. private static final long serialVersionUID = 1L;
  78. @Override
  79. public Double call(Long userid, Long createdid, Double rate) throws Exception {
  80. if(rate==null){
  81. rate=0.0;
  82. }
  83. if(userid==createdid){
  84. rate=rate+100.0;
  85. }
  86. return rate;
  87. }
  88. }, DataTypes.DoubleType);
  89. //定义一个udf函数,计算群的等级,后期计算规则变化的话,在此修改
  90. sqlContext.udf().register("getCrowdGrade", new UDF2<Long,Integer,Double>() {
  91. private static final long serialVersionUID = 1L;
  92. @Override
  93. public Double call(Long numPerCrowd, Integer totalUser) throws Exception {
  94. if(null==numPerCrowd){
  95. numPerCrowd=0l;
  96. }
  97. if(null==totalUser){
  98. return 0.0;
  99. }
  100. double totalnum=totalUser*1.0;
  101. double numper=numPerCrowd*1.0;
  102. double percent=numper/totalnum;
  103. BigDecimal b = new BigDecimal(percent);
  104. double f1 = b.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
  105. return f1;
  106. }
  107. }, DataTypes.DoubleType);
  108. //获取圈表
  109. options.put("dbtable", "fl_circle");
  110. Dataset<Row> flCircleDF = sqlContext.read().format("jdbc").options(options).load();
  111. //注册圈表
  112. flCircleDF.registerTempTable("fl_circle");
  113. //获取圈与会员关系表
  114. options.put("dbtable", "fl_member_and_circle");
  115. Dataset<Row> memberAndCircleDF = sqlContext.read().format("jdbc").options(options).load();
  116. //注册圈与会员关系表
  117. memberAndCircleDF.registerTempTable("fl_member_and_circle");
  118. //统计各个圈中的会员数量,把结果注册成一个临时表
  119. Dataset<Row> circle_statistic=sqlContext.sql("select CIRCLE_ID,count(*) as num_per_circle from fl_member_and_circle where IS_DEL=0 GROUP BY CIRCLE_ID order by num_per_circle desc");
  120. circle_statistic.registerTempTable("circle_statistic");
  121. //获取圈中的会员数量的总和
  122. Dataset<Row> sumary=sqlContext.sql("select sum(num_per_circle) as num from circle_statistic");
  123. JavaRDD<Row> sumartRDD=sumary.javaRDD();
  124. Object obj=sumartRDD.first().get(0);
  125. //把总和广播出去
  126. Broadcast<String> circle_total_people=sc.broadcast(obj.toString());
  127. //两次join
  128. //第一次join:圈表和上边得到的结果表join,此表包括圈的基本信息和每个圈的人数,
  129. Dataset<Row> circle_data_and_peoplenum=sqlContext.sql("select statistic.CIRCLE_ID,statistic.num_per_circle,temp.RC_ID,temp.CIRCLE_NAME,temp.CIRCLE_HEAD,temp.SCENE_TYPE from (select ID,RC_ID,CIRCLE_NAME,CIRCLE_HEAD,SCENE_TYPE from fl_circle where IS_DEL=0) as temp right join circle_statistic as statistic on statistic.CIRCLE_ID=temp.ID");
  130. circle_data_and_peoplenum.registerTempTable("circle_data_and_peoplenum");
  131. //第二次join:人员与圈关系表与上边的大表join,得到结果,,得到最终结果
  132. Dataset<Row> circle_user_score_df=sqlContext.sql("select mc.MID,mc.CIRCLE_ID,cp.num_per_circle,ROUND(cp.num_per_circle/"+circle_total_people.value()+",9) as score,cp.RC_ID,cp.CIRCLE_NAME,cp.CIRCLE_HEAD from fl_member_and_circle as mc left join circle_data_and_peoplenum as cp on mc.CIRCLE_ID=cp.CIRCLE_ID order by score desc");
  133. circle_user_score_df.registerTempTable("circle_user_score");
  134. //分组取top N row_number() over (partition by order by desc),获得我的圈子排行中的根据等级排行
  135. Dataset<Row> circle_user_score_order_result_df=sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by MID order by num_per_circle desc) as rank from circle_user_score ) t where t.rank<=10");
  136. circle_user_score_order_result_df.registerTempTable("circle_user_score_order_result");
  137. // (我的圈子根据等级排行)输出到mongodb中,
  138. MongoSpark.write(circle_user_score_order_result_df).option("collection", "circle_rank_grade_pri").mode("overwrite").save();
  139. //根据活跃度的结果
  140. //获取盈币变化的流水表 fl_pay_profit_coins_serial
  141. options.put("dbtable", "fl_pay_profit_coins_serial");
  142. Dataset<Row> fl_pay_profit_coins_serial_df = sqlContext.read().format("jdbc").options(options).load();
  143. //注册盈币变化的流水表
  144. fl_pay_profit_coins_serial_df.registerTempTable("fl_pay_profit_coins_serial");
  145. //筛选此表中内容,只计算头一天的盈币增加的用户,用户去重
  146. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
  147. Date date=new Date();
  148. Calendar ca = Calendar.getInstance();// 得到一个Calendar的实例
  149. ca.setTime(date); // 设置时间为当前时间
  150. String end=sdf.format(date);
  151. ca.add(Calendar.DATE, -1);// 日期
  152. Date resultDate = ca.getTime(); // 结果
  153. String start=sdf.format(resultDate);
  154. //start和end必须广播出去
  155. Broadcast<String> startTime = sc.broadcast(start);
  156. Broadcast<String> endTime = sc.broadcast(end);
  157. Dataset<Row> activite_users_df=sqlContext.sql("select DISTINCT M_ID from fl_pay_profit_coins_serial where CREATED BETWEEN '"+startTime.value()+"' and '"+endTime.value()+"' and TYPE=1");
  158. //注册为活跃用户表
  159. activite_users_df.registerTempTable("activite_users");
  160. //join,得到每个圈子中的活跃用户的数量
  161. Dataset<Row> activite_users_per_circle_df=sqlContext.sql("select CIRCLE_ID,count(*) as circle_active_num from (select au.M_ID,fmc.CIRCLE_ID from activite_users as au left join fl_member_and_circle as fmc on au.M_ID=fmc.MID) as temp group by temp.CIRCLE_ID");
  162. activite_users_per_circle_df.registerTempTable("activite_users_per_circle");
  163. //与circle_user_score_df的结果进行join,得到活跃度值的表,
  164. Dataset<Row> circle_active_rate = sqlContext.sql("select cus.MID,cus.CIRCLE_ID,cus.num_per_circle,cus.RC_ID,cus.CIRCLE_NAME,cus.CIRCLE_HEAD,aupc.circle_active_num,ROUND(aupc.circle_active_num/cus.num_per_circle,4) as active_rate from circle_user_score as cus left join activite_users_per_circle as aupc on cus.CIRCLE_ID=aupc.CIRCLE_ID");
  165. Map<String,Object> valueMap=new HashMap<String,Object>();
  166. valueMap.put("circle_active_num",0l);
  167. valueMap.put("active_rate",0);
  168. circle_active_rate=circle_active_rate.na().fill(valueMap);
  169. circle_active_rate.registerTempTable("circle_active_rate");
  170. //分组取topN得到我的圈子排行,按照活跃度值排行
  171. Dataset<Row> circle_active_result = sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate from (select MID,CIRCLE_ID,num_per_circle,RC_ID,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate,row_number() over (partition by MID order by circle_active_num desc) as rank from circle_active_rate) t where t.rank<=10");
  172. circle_active_result.registerTempTable("circle_active_result");
  173. // (我的圈子根据活跃度排行)输出到mongodb中,
  174. MongoSpark.write(circle_active_result).option("collection", "circle_rank_activite_pri").mode("overwrite").save();
  175. //处理获取圈子排行榜,根据场景排序,直接第一次join的结果来开窗函数(获取圈子排行榜,按照场景分)
  176. Dataset<Row> circle_scene_score_order_result_df=sqlContext.sql("select SCENE_TYPE,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select SCENE_TYPE,CIRCLE_ID,num_per_circle,ROUND(num_per_circle/"+circle_total_people.value()+",9) as score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by SCENE_TYPE order by num_per_circle desc) as rank from circle_data_and_peoplenum ) t where t.rank<=10");
  177. //把获取圈子排行榜的结果输出到文件中,以json格式保存,每个user输出10个
  178. // (圈子根据等级排行,按照场景分)输出到mongodb中,
  179. MongoSpark.write(circle_scene_score_order_result_df).option("collection", "circle_rank_grade_pub").mode("overwrite").save();
  180. //圈子结束,社群开始,社群根据活跃度排行,
  181. //获取群表
  182. options.put("dbtable", "fl_crowds");
  183. Dataset<Row> fl_crowds_df = sqlContext.read().format("jdbc").options(options).load();
  184. //注册群表
  185. fl_crowds_df.registerTempTable("fl_crowds");
  186. //获取群与用户关系表
  187. options.put("dbtable", "fl_member_and_crowd");
  188. Dataset<Row> fl_member_and_crowd_df = sqlContext.read().format("jdbc").options(options).load();
  189. //注册群与用户关系表
  190. fl_member_and_crowd_df.registerTempTable("fl_member_and_crowd");
  191. //获取每个群中的总人数
  192. Dataset<Row> crowd_statistic_usertotal=sqlContext.sql("SELECT CROWD_ID,COUNT(*) as num_per_crowd from fl_member_and_crowd GROUP BY CROWD_ID");
  193. //注册为临时表
  194. crowd_statistic_usertotal.registerTempTable("crowd_statistic_usertotal");
  195. //获取每个群中的总人数,获取每个群中的活跃用户数,群与用户关系表的两次join分别得到每个群中的总人数和每个群的活跃用户数
  196. //第一次join,得到有活跃用户的群的活跃用户的数量与群的关系表
  197. sqlContext.sql("select au.M_ID,mc.CROWD_ID from activite_users as au left join fl_member_and_crowd as mc on au.M_ID=mc.M_ID ").registerTempTable("crowd_activite");
  198. Dataset<Row> crowd_statistic_activite=sqlContext.sql("select CROWD_ID,count(*) as activie_user_num from crowd_activite group by CROWD_ID");
  199. crowd_statistic_activite.registerTempTable("crowd_statistic_activite");
  200. //第二次join得到每个群中的总人数和活跃用户数
  201. Dataset<Row> crowd_numuser_activiuser=sqlContext.sql("select csu.CROWD_ID,csu.num_per_crowd,csa.activie_user_num from crowd_statistic_usertotal as csu left join crowd_statistic_activite as csa on csu.CROWD_ID=csa.CROWD_ID order by activie_user_num");
  202. Map<String,Object> valueMap2=new HashMap<String,Object>();
  203. valueMap2.put("activie_user_num",0l);
  204. valueMap2.put("num_per_crowd",0l);
  205. crowd_numuser_activiuser=crowd_numuser_activiuser.na().fill(valueMap2);
  206. crowd_numuser_activiuser.registerTempTable("crowd_numuser_activiuser");
  207. //第三次join,得到每个群中的详细信息、每个群中的活跃人数、总人数、每个群的活跃值
  208. Dataset<Row> crowd_df=sqlContext.sql("select fc.CROWD_LOGO,fc.RC_ID,fc.CROWD_ID,fc.CROWD_NAME,fc.CREATED_BY,cna.num_per_crowd,cna.activie_user_num,Round(cna.activie_user_num/cna.num_per_crowd,9) as crowd_active_rate from fl_crowds as fc left join crowd_numuser_activiuser as cna on fc.CROWD_ID=cna.CROWD_ID");
  209. Map<String,Object> valueMap3=new HashMap<String,Object>();
  210. valueMap3.put("num_per_crowd",0l);
  211. valueMap3.put("activie_user_num",0l);
  212. valueMap3.put("crowd_active_rate",0.0);
  213. crowd_df=crowd_df.na().fill(valueMap3);
  214. crowd_df.registerTempTable("crowd_df");
  215. //第四次join,把每个用户加入的群的信息添加进来,显示群的基本信息,群中mid,每个群的活跃度评分,针对是否为用户创建后的活跃度评分
  216. Dataset<Row> crowd_activite_base_score=sqlContext.sql("select fmc.M_ID,fmc.CROWD_ID,cd.CROWD_LOGO,cd.RC_ID,cd.CROWD_NAME,cd.CREATED_BY,cd.num_per_crowd,cd.activie_user_num,cd.crowd_active_rate,updateActiveRate(fmc.M_ID,cd.CREATED_BY,cd.crowd_active_rate) as crowd_active_rate_after from fl_member_and_crowd as fmc left join crowd_df as cd on fmc.CROWD_ID=cd.CROWD_ID ");
  217. Map<String,Object> valueMap4=new HashMap<String,Object>();
  218. valueMap3.put("num_per_crowd",0l);
  219. valueMap3.put("activie_user_num",0l);
  220. valueMap3.put("crowd_active_rate",0.0);
  221. crowd_df=crowd_df.na().fill(valueMap3);
  222. crowd_activite_base_score.registerTempTable("crowd_activite_base_score");
  223. //计算所有社群的总人数
  224. Dataset<Row> crowdSumary=sqlContext.sql("select sum(num_per_crowd) as num from crowd_activite_base_score");
  225. JavaRDD<Row> crowdSumaryRDD=crowdSumary.javaRDD();
  226. Long crow=(Long) crowdSumaryRDD.first().get(0);
  227. //把总和广播出去
  228. Broadcast<Long> crowd_total_people=sc.broadcast(crow);
  229. //select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10
  230. Dataset<Row> crowd_base_grade_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,getCrowdGrade(num_per_crowd," + crowd_total_people.value() + ") as grade from crowd_activite_base_score");
  231. crowd_base_grade_df.registerTempTable("crowd_base_grade");
  232. Dataset<Row> crowd_grade_top_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade,row_number() over (partition by M_ID order by grade desc) as rank from crowd_base_grade) t where t.rank<=10");
  233. MongoSpark.write(crowd_grade_top_df).option("collection", "crowd_rank_grade").mode("overwrite").save();
  234. //分组取topN,得到社群排行
  235. Dataset<Row> crowd_rank_result=sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10");
  236. crowd_rank_result.registerTempTable("crowd_rank_result");
  237. // (社群排行,根据活跃度)输出到mongodb中,
  238. MongoSpark.write(crowd_rank_result).option("collection", "crowd_rank_active").mode("overwrite").save();
  239. //查看圈子中最活跃的三个群
  240. //注册全群关系表 fl_circle_crowds
  241. options.put("dbtable", "fl_circle_crowds");
  242. Dataset<Row> fl_circle_crowds = sqlContext.read().format("jdbc").options(options).load();
  243. //注册群与用户关系表
  244. fl_circle_crowds.registerTempTable("fl_circle_crowds");
  245. //社群排行中的第四次join的结果表与群圈关系表再进行join,得到用户,圈,群,群活跃度,的大表,根据是否为本用户创建修改圈活跃度的值,在已经算好的值的基础上增加100
  246. Dataset<Row> crowd_activite_three=sqlContext.sql("select fcc.CIRCLE_ID,fcc.CROWDS_ID,cabs.M_ID,cabs.CROWD_LOGO,cabs.RC_ID,cabs.CROWD_NAME,cabs.CREATED_BY,cabs.num_per_crowd,cabs.activie_user_num,cabs.crowd_active_rate,cabs.crowd_active_rate_after from fl_circle_crowds as fcc left join crowd_activite_base_score as cabs on fcc.CROWDS_ID=cabs.CROWD_ID ");
  247. crowd_activite_three.registerTempTable("crowd_activite_three");
  248. //分组取topn
  249. Dataset<Row> crowd_activite_three_result=sqlContext.sql("select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after from (select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after,row_number() over (partition by M_ID,CIRCLE_ID order by crowd_active_rate desc) as rank from crowd_activite_three) t where t.rank<=3");
  250. // (社群排行,根据活跃度,活跃度前三)输出到mongodb中,
  251. MongoSpark.write(crowd_activite_three_result).option("collection", "crowd_rank_active_three").mode("overwrite").save();
  252. sc.stop();
  253. /*
  254. *
  255. *
  256. *
  257. * */
  258. }
  259. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注