大数据Spark调用一例
package com.huawei.bigdata.spark.rank;import com.huawei.hadoop.security.LoginUtil;import org.apache.hadoop.conf.Configuration;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.api.java.UDF2;import org.apache.spark.sql.api.java.UDF3;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.SparkSession;import java.io.InputStream;import java.math.BigDecimal;import java.text.SimpleDateFormat;import java.util.*;import com.mongodb.spark.MongoSpark;public class CircleCrowdRank { public static void main(String[] args) throws Exception { Properties props = new Properties(); //判断是本地还是集群,本地local,集群huawei String master=args[0]; SparkSession spark =null; Map<String, String> options = new HashMap<String,String>(); String bootstrapServers=""; if("local".equals(master)){ System.setProperty("hadoop.home.dir", "D:\\software\\hadoop_home_bin"); InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/localjdbc.properties"); props.load(resourceAsStream); System.out.println("设置集群运行master为local环境"); spark = SparkSession.builder() .master("local") .appName("userRank") .config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri")) .config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database")) .config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection")) .config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri")) .config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database")) .config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection")) .getOrCreate(); //连接mysql数据库的参数信息 options.put("url", props.getProperty("url")); options.put("driver", props.getProperty("driver")); options.put("user", props.getProperty("user")); options.put("password", props.getProperty("password")); } if("huawei".equals(master)){ InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/huaweijdbc.properties"); props.load(resourceAsStream); //设置用户名,和该用户的认证凭证/opt/userauthor/panpan_spark_ren String userPrincipal = props.getProperty("userPrincipal"); String userKeytabPath = props.getProperty("userKeytabPath"); String krb5ConfPath = props.getProperty("krb5ConfPath"); Configuration hadoopConf = new Configuration(); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); spark = SparkSession.builder() .appName("userRank") .config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri")) .config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database")) .config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection")) .config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri")) .config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database")) .config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection")) .getOrCreate(); //连接mysql数据库的参数信息 options.put("url", props.getProperty("url")); options.put("driver", props.getProperty("driver")); options.put("user", props.getProperty("user")); options.put("password", props.getProperty("password")); } JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); //sc.setLogLevel("WARN"); SQLContext sqlContext = new SQLContext(sc); //定义一个udf函数,判断是否为本user创建,如果是本user创建的,则活跃度增加100,为了实现排序,默认创建的排在首位 sqlContext.udf().register("updateActiveRate", new UDF3<Long,Long,Double,Double>() { private static final long serialVersionUID = 1L; @Override public Double call(Long userid, Long createdid, Double rate) throws Exception { if(rate==null){ rate=0.0; } if(userid==createdid){ rate=rate+100.0; } return rate; } }, DataTypes.DoubleType); //定义一个udf函数,计算群的等级,后期计算规则变化的话,在此修改 sqlContext.udf().register("getCrowdGrade", new UDF2<Long,Integer,Double>() { private static final long serialVersionUID = 1L; @Override public Double call(Long numPerCrowd, Integer totalUser) throws Exception { if(null==numPerCrowd){ numPerCrowd=0l; } if(null==totalUser){ return 0.0; } double totalnum=totalUser*1.0; double numper=numPerCrowd*1.0; double percent=numper/totalnum; BigDecimal b = new BigDecimal(percent); double f1 = b.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); return f1; } }, DataTypes.DoubleType); //获取圈表 options.put("dbtable", "fl_circle"); Dataset<Row> flCircleDF = sqlContext.read().format("jdbc").options(options).load(); //注册圈表 flCircleDF.registerTempTable("fl_circle"); //获取圈与会员关系表 options.put("dbtable", "fl_member_and_circle"); Dataset<Row> memberAndCircleDF = sqlContext.read().format("jdbc").options(options).load(); //注册圈与会员关系表 memberAndCircleDF.registerTempTable("fl_member_and_circle"); //统计各个圈中的会员数量,把结果注册成一个临时表 Dataset<Row> circle_statistic=sqlContext.sql("select CIRCLE_ID,count(*) as num_per_circle from fl_member_and_circle where IS_DEL=0 GROUP BY CIRCLE_ID order by num_per_circle desc"); circle_statistic.registerTempTable("circle_statistic"); //获取圈中的会员数量的总和 Dataset<Row> sumary=sqlContext.sql("select sum(num_per_circle) as num from circle_statistic"); JavaRDD<Row> sumartRDD=sumary.javaRDD(); Object obj=sumartRDD.first().get(0); //把总和广播出去 Broadcast<String> circle_total_people=sc.broadcast(obj.toString()); //两次join //第一次join:圈表和上边得到的结果表join,此表包括圈的基本信息和每个圈的人数, Dataset<Row> circle_data_and_peoplenum=sqlContext.sql("select statistic.CIRCLE_ID,statistic.num_per_circle,temp.RC_ID,temp.CIRCLE_NAME,temp.CIRCLE_HEAD,temp.SCENE_TYPE from (select ID,RC_ID,CIRCLE_NAME,CIRCLE_HEAD,SCENE_TYPE from fl_circle where IS_DEL=0) as temp right join circle_statistic as statistic on statistic.CIRCLE_ID=temp.ID"); circle_data_and_peoplenum.registerTempTable("circle_data_and_peoplenum"); //第二次join:人员与圈关系表与上边的大表join,得到结果,,得到最终结果 Dataset<Row> circle_user_score_df=sqlContext.sql("select mc.MID,mc.CIRCLE_ID,cp.num_per_circle,ROUND(cp.num_per_circle/"+circle_total_people.value()+",9) as score,cp.RC_ID,cp.CIRCLE_NAME,cp.CIRCLE_HEAD from fl_member_and_circle as mc left join circle_data_and_peoplenum as cp on mc.CIRCLE_ID=cp.CIRCLE_ID order by score desc"); circle_user_score_df.registerTempTable("circle_user_score"); //分组取top N row_number() over (partition by order by desc),获得我的圈子排行中的根据等级排行 Dataset<Row> circle_user_score_order_result_df=sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by MID order by num_per_circle desc) as rank from circle_user_score ) t where t.rank<=10"); circle_user_score_order_result_df.registerTempTable("circle_user_score_order_result"); // (我的圈子根据等级排行)输出到mongodb中, MongoSpark.write(circle_user_score_order_result_df).option("collection", "circle_rank_grade_pri").mode("overwrite").save(); //根据活跃度的结果 //获取盈币变化的流水表 fl_pay_profit_coins_serial options.put("dbtable", "fl_pay_profit_coins_serial"); Dataset<Row> fl_pay_profit_coins_serial_df = sqlContext.read().format("jdbc").options(options).load(); //注册盈币变化的流水表 fl_pay_profit_coins_serial_df.registerTempTable("fl_pay_profit_coins_serial"); //筛选此表中内容,只计算头一天的盈币增加的用户,用户去重 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date date=new Date(); Calendar ca = Calendar.getInstance();// 得到一个Calendar的实例 ca.setTime(date); // 设置时间为当前时间 String end=sdf.format(date); ca.add(Calendar.DATE, -1);// 日期 Date resultDate = ca.getTime(); // 结果 String start=sdf.format(resultDate); //start和end必须广播出去 Broadcast<String> startTime = sc.broadcast(start); Broadcast<String> endTime = sc.broadcast(end); Dataset<Row> activite_users_df=sqlContext.sql("select DISTINCT M_ID from fl_pay_profit_coins_serial where CREATED BETWEEN '"+startTime.value()+"' and '"+endTime.value()+"' and TYPE=1"); //注册为活跃用户表 activite_users_df.registerTempTable("activite_users"); //join,得到每个圈子中的活跃用户的数量 Dataset<Row> activite_users_per_circle_df=sqlContext.sql("select CIRCLE_ID,count(*) as circle_active_num from (select au.M_ID,fmc.CIRCLE_ID from activite_users as au left join fl_member_and_circle as fmc on au.M_ID=fmc.MID) as temp group by temp.CIRCLE_ID"); activite_users_per_circle_df.registerTempTable("activite_users_per_circle"); //与circle_user_score_df的结果进行join,得到活跃度值的表, Dataset<Row> circle_active_rate = sqlContext.sql("select cus.MID,cus.CIRCLE_ID,cus.num_per_circle,cus.RC_ID,cus.CIRCLE_NAME,cus.CIRCLE_HEAD,aupc.circle_active_num,ROUND(aupc.circle_active_num/cus.num_per_circle,4) as active_rate from circle_user_score as cus left join activite_users_per_circle as aupc on cus.CIRCLE_ID=aupc.CIRCLE_ID"); Map<String,Object> valueMap=new HashMap<String,Object>(); valueMap.put("circle_active_num",0l); valueMap.put("active_rate",0); circle_active_rate=circle_active_rate.na().fill(valueMap); circle_active_rate.registerTempTable("circle_active_rate"); //分组取topN得到我的圈子排行,按照活跃度值排行 Dataset<Row> circle_active_result = sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate from (select MID,CIRCLE_ID,num_per_circle,RC_ID,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate,row_number() over (partition by MID order by circle_active_num desc) as rank from circle_active_rate) t where t.rank<=10"); circle_active_result.registerTempTable("circle_active_result"); // (我的圈子根据活跃度排行)输出到mongodb中, MongoSpark.write(circle_active_result).option("collection", "circle_rank_activite_pri").mode("overwrite").save(); //处理获取圈子排行榜,根据场景排序,直接第一次join的结果来开窗函数(获取圈子排行榜,按照场景分) Dataset<Row> circle_scene_score_order_result_df=sqlContext.sql("select SCENE_TYPE,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select SCENE_TYPE,CIRCLE_ID,num_per_circle,ROUND(num_per_circle/"+circle_total_people.value()+",9) as score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by SCENE_TYPE order by num_per_circle desc) as rank from circle_data_and_peoplenum ) t where t.rank<=10"); //把获取圈子排行榜的结果输出到文件中,以json格式保存,每个user输出10个 // (圈子根据等级排行,按照场景分)输出到mongodb中, MongoSpark.write(circle_scene_score_order_result_df).option("collection", "circle_rank_grade_pub").mode("overwrite").save(); //圈子结束,社群开始,社群根据活跃度排行, //获取群表 options.put("dbtable", "fl_crowds"); Dataset<Row> fl_crowds_df = sqlContext.read().format("jdbc").options(options).load(); //注册群表 fl_crowds_df.registerTempTable("fl_crowds"); //获取群与用户关系表 options.put("dbtable", "fl_member_and_crowd"); Dataset<Row> fl_member_and_crowd_df = sqlContext.read().format("jdbc").options(options).load(); //注册群与用户关系表 fl_member_and_crowd_df.registerTempTable("fl_member_and_crowd"); //获取每个群中的总人数 Dataset<Row> crowd_statistic_usertotal=sqlContext.sql("SELECT CROWD_ID,COUNT(*) as num_per_crowd from fl_member_and_crowd GROUP BY CROWD_ID"); //注册为临时表 crowd_statistic_usertotal.registerTempTable("crowd_statistic_usertotal"); //获取每个群中的总人数,获取每个群中的活跃用户数,群与用户关系表的两次join分别得到每个群中的总人数和每个群的活跃用户数 //第一次join,得到有活跃用户的群的活跃用户的数量与群的关系表 sqlContext.sql("select au.M_ID,mc.CROWD_ID from activite_users as au left join fl_member_and_crowd as mc on au.M_ID=mc.M_ID ").registerTempTable("crowd_activite"); Dataset<Row> crowd_statistic_activite=sqlContext.sql("select CROWD_ID,count(*) as activie_user_num from crowd_activite group by CROWD_ID"); crowd_statistic_activite.registerTempTable("crowd_statistic_activite"); //第二次join得到每个群中的总人数和活跃用户数 Dataset<Row> crowd_numuser_activiuser=sqlContext.sql("select csu.CROWD_ID,csu.num_per_crowd,csa.activie_user_num from crowd_statistic_usertotal as csu left join crowd_statistic_activite as csa on csu.CROWD_ID=csa.CROWD_ID order by activie_user_num"); Map<String,Object> valueMap2=new HashMap<String,Object>(); valueMap2.put("activie_user_num",0l); valueMap2.put("num_per_crowd",0l); crowd_numuser_activiuser=crowd_numuser_activiuser.na().fill(valueMap2); crowd_numuser_activiuser.registerTempTable("crowd_numuser_activiuser"); //第三次join,得到每个群中的详细信息、每个群中的活跃人数、总人数、每个群的活跃值 Dataset<Row> crowd_df=sqlContext.sql("select fc.CROWD_LOGO,fc.RC_ID,fc.CROWD_ID,fc.CROWD_NAME,fc.CREATED_BY,cna.num_per_crowd,cna.activie_user_num,Round(cna.activie_user_num/cna.num_per_crowd,9) as crowd_active_rate from fl_crowds as fc left join crowd_numuser_activiuser as cna on fc.CROWD_ID=cna.CROWD_ID"); Map<String,Object> valueMap3=new HashMap<String,Object>(); valueMap3.put("num_per_crowd",0l); valueMap3.put("activie_user_num",0l); valueMap3.put("crowd_active_rate",0.0); crowd_df=crowd_df.na().fill(valueMap3); crowd_df.registerTempTable("crowd_df"); //第四次join,把每个用户加入的群的信息添加进来,显示群的基本信息,群中mid,每个群的活跃度评分,针对是否为用户创建后的活跃度评分 Dataset<Row> crowd_activite_base_score=sqlContext.sql("select fmc.M_ID,fmc.CROWD_ID,cd.CROWD_LOGO,cd.RC_ID,cd.CROWD_NAME,cd.CREATED_BY,cd.num_per_crowd,cd.activie_user_num,cd.crowd_active_rate,updateActiveRate(fmc.M_ID,cd.CREATED_BY,cd.crowd_active_rate) as crowd_active_rate_after from fl_member_and_crowd as fmc left join crowd_df as cd on fmc.CROWD_ID=cd.CROWD_ID "); Map<String,Object> valueMap4=new HashMap<String,Object>(); valueMap3.put("num_per_crowd",0l); valueMap3.put("activie_user_num",0l); valueMap3.put("crowd_active_rate",0.0); crowd_df=crowd_df.na().fill(valueMap3); crowd_activite_base_score.registerTempTable("crowd_activite_base_score"); //计算所有社群的总人数 Dataset<Row> crowdSumary=sqlContext.sql("select sum(num_per_crowd) as num from crowd_activite_base_score"); JavaRDD<Row> crowdSumaryRDD=crowdSumary.javaRDD(); Long crow=(Long) crowdSumaryRDD.first().get(0); //把总和广播出去 Broadcast<Long> crowd_total_people=sc.broadcast(crow); //select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10 Dataset<Row> crowd_base_grade_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,getCrowdGrade(num_per_crowd," + crowd_total_people.value() + ") as grade from crowd_activite_base_score"); crowd_base_grade_df.registerTempTable("crowd_base_grade"); Dataset<Row> crowd_grade_top_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade,row_number() over (partition by M_ID order by grade desc) as rank from crowd_base_grade) t where t.rank<=10"); MongoSpark.write(crowd_grade_top_df).option("collection", "crowd_rank_grade").mode("overwrite").save(); //分组取topN,得到社群排行 Dataset<Row> crowd_rank_result=sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10"); crowd_rank_result.registerTempTable("crowd_rank_result"); // (社群排行,根据活跃度)输出到mongodb中, MongoSpark.write(crowd_rank_result).option("collection", "crowd_rank_active").mode("overwrite").save(); //查看圈子中最活跃的三个群 //注册全群关系表 fl_circle_crowds options.put("dbtable", "fl_circle_crowds"); Dataset<Row> fl_circle_crowds = sqlContext.read().format("jdbc").options(options).load(); //注册群与用户关系表 fl_circle_crowds.registerTempTable("fl_circle_crowds"); //社群排行中的第四次join的结果表与群圈关系表再进行join,得到用户,圈,群,群活跃度,的大表,根据是否为本用户创建修改圈活跃度的值,在已经算好的值的基础上增加100 Dataset<Row> crowd_activite_three=sqlContext.sql("select fcc.CIRCLE_ID,fcc.CROWDS_ID,cabs.M_ID,cabs.CROWD_LOGO,cabs.RC_ID,cabs.CROWD_NAME,cabs.CREATED_BY,cabs.num_per_crowd,cabs.activie_user_num,cabs.crowd_active_rate,cabs.crowd_active_rate_after from fl_circle_crowds as fcc left join crowd_activite_base_score as cabs on fcc.CROWDS_ID=cabs.CROWD_ID "); crowd_activite_three.registerTempTable("crowd_activite_three"); //分组取topn Dataset<Row> crowd_activite_three_result=sqlContext.sql("select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after from (select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after,row_number() over (partition by M_ID,CIRCLE_ID order by crowd_active_rate desc) as rank from crowd_activite_three) t where t.rank<=3"); // (社群排行,根据活跃度,活跃度前三)输出到mongodb中, MongoSpark.write(crowd_activite_three_result).option("collection", "crowd_rank_active_three").mode("overwrite").save(); sc.stop(); /* * * * * */ }}