大数据Spark调用一例
package com.huawei.bigdata.spark.rank;
import com.huawei.hadoop.security.LoginUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.SparkSession;
import java.io.InputStream;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import com.mongodb.spark.MongoSpark;
public class CircleCrowdRank {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
//判断是本地还是集群,本地local,集群huawei
String master=args[0];
SparkSession spark =null;
Map<String, String> options = new HashMap<String,String>();
String bootstrapServers="";
if("local".equals(master)){
System.setProperty("hadoop.home.dir", "D:\\software\\hadoop_home_bin");
InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/localjdbc.properties");
props.load(resourceAsStream);
System.out.println("设置集群运行master为local环境");
spark = SparkSession.builder()
.master("local")
.appName("userRank")
.config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri"))
.config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database"))
.config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection"))
.config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri"))
.config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database"))
.config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection"))
.getOrCreate();
//连接mysql数据库的参数信息
options.put("url", props.getProperty("url"));
options.put("driver", props.getProperty("driver"));
options.put("user", props.getProperty("user"));
options.put("password", props.getProperty("password"));
}
if("huawei".equals(master)){
InputStream resourceAsStream = CircleCrowdRank.class.getResourceAsStream("/conf/huaweijdbc.properties");
props.load(resourceAsStream);
//设置用户名,和该用户的认证凭证/opt/userauthor/panpan_spark_ren
String userPrincipal = props.getProperty("userPrincipal");
String userKeytabPath = props.getProperty("userKeytabPath");
String krb5ConfPath = props.getProperty("krb5ConfPath");
Configuration hadoopConf = new Configuration();
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
spark = SparkSession.builder()
.appName("userRank")
.config("spark.mongodb.input.uri", props.getProperty("spark.mongodb.input.uri"))
.config("spark.mongodb.input.database",props.getProperty("spark.mongodb.input.database"))
.config("spark.mongodb.input.collection",props.getProperty("spark.mongodb.input.collection"))
.config("spark.mongodb.output.uri",props.getProperty("spark.mongodb.output.uri"))
.config("spark.mongodb.output.database",props.getProperty("spark.mongodb.output.database"))
.config("spark.mongodb.output.collection",props.getProperty("spark.mongodb.output.collection"))
.getOrCreate();
//连接mysql数据库的参数信息
options.put("url", props.getProperty("url"));
options.put("driver", props.getProperty("driver"));
options.put("user", props.getProperty("user"));
options.put("password", props.getProperty("password"));
}
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//sc.setLogLevel("WARN");
SQLContext sqlContext = new SQLContext(sc);
//定义一个udf函数,判断是否为本user创建,如果是本user创建的,则活跃度增加100,为了实现排序,默认创建的排在首位
sqlContext.udf().register("updateActiveRate", new UDF3<Long,Long,Double,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Double call(Long userid, Long createdid, Double rate) throws Exception {
if(rate==null){
rate=0.0;
}
if(userid==createdid){
rate=rate+100.0;
}
return rate;
}
}, DataTypes.DoubleType);
//定义一个udf函数,计算群的等级,后期计算规则变化的话,在此修改
sqlContext.udf().register("getCrowdGrade", new UDF2<Long,Integer,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Double call(Long numPerCrowd, Integer totalUser) throws Exception {
if(null==numPerCrowd){
numPerCrowd=0l;
}
if(null==totalUser){
return 0.0;
}
double totalnum=totalUser*1.0;
double numper=numPerCrowd*1.0;
double percent=numper/totalnum;
BigDecimal b = new BigDecimal(percent);
double f1 = b.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
return f1;
}
}, DataTypes.DoubleType);
//获取圈表
options.put("dbtable", "fl_circle");
Dataset<Row> flCircleDF = sqlContext.read().format("jdbc").options(options).load();
//注册圈表
flCircleDF.registerTempTable("fl_circle");
//获取圈与会员关系表
options.put("dbtable", "fl_member_and_circle");
Dataset<Row> memberAndCircleDF = sqlContext.read().format("jdbc").options(options).load();
//注册圈与会员关系表
memberAndCircleDF.registerTempTable("fl_member_and_circle");
//统计各个圈中的会员数量,把结果注册成一个临时表
Dataset<Row> circle_statistic=sqlContext.sql("select CIRCLE_ID,count(*) as num_per_circle from fl_member_and_circle where IS_DEL=0 GROUP BY CIRCLE_ID order by num_per_circle desc");
circle_statistic.registerTempTable("circle_statistic");
//获取圈中的会员数量的总和
Dataset<Row> sumary=sqlContext.sql("select sum(num_per_circle) as num from circle_statistic");
JavaRDD<Row> sumartRDD=sumary.javaRDD();
Object obj=sumartRDD.first().get(0);
//把总和广播出去
Broadcast<String> circle_total_people=sc.broadcast(obj.toString());
//两次join
//第一次join:圈表和上边得到的结果表join,此表包括圈的基本信息和每个圈的人数,
Dataset<Row> circle_data_and_peoplenum=sqlContext.sql("select statistic.CIRCLE_ID,statistic.num_per_circle,temp.RC_ID,temp.CIRCLE_NAME,temp.CIRCLE_HEAD,temp.SCENE_TYPE from (select ID,RC_ID,CIRCLE_NAME,CIRCLE_HEAD,SCENE_TYPE from fl_circle where IS_DEL=0) as temp right join circle_statistic as statistic on statistic.CIRCLE_ID=temp.ID");
circle_data_and_peoplenum.registerTempTable("circle_data_and_peoplenum");
//第二次join:人员与圈关系表与上边的大表join,得到结果,,得到最终结果
Dataset<Row> circle_user_score_df=sqlContext.sql("select mc.MID,mc.CIRCLE_ID,cp.num_per_circle,ROUND(cp.num_per_circle/"+circle_total_people.value()+",9) as score,cp.RC_ID,cp.CIRCLE_NAME,cp.CIRCLE_HEAD from fl_member_and_circle as mc left join circle_data_and_peoplenum as cp on mc.CIRCLE_ID=cp.CIRCLE_ID order by score desc");
circle_user_score_df.registerTempTable("circle_user_score");
//分组取top N row_number() over (partition by order by desc),获得我的圈子排行中的根据等级排行
Dataset<Row> circle_user_score_order_result_df=sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select MID,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by MID order by num_per_circle desc) as rank from circle_user_score ) t where t.rank<=10");
circle_user_score_order_result_df.registerTempTable("circle_user_score_order_result");
// (我的圈子根据等级排行)输出到mongodb中,
MongoSpark.write(circle_user_score_order_result_df).option("collection", "circle_rank_grade_pri").mode("overwrite").save();
//根据活跃度的结果
//获取盈币变化的流水表 fl_pay_profit_coins_serial
options.put("dbtable", "fl_pay_profit_coins_serial");
Dataset<Row> fl_pay_profit_coins_serial_df = sqlContext.read().format("jdbc").options(options).load();
//注册盈币变化的流水表
fl_pay_profit_coins_serial_df.registerTempTable("fl_pay_profit_coins_serial");
//筛选此表中内容,只计算头一天的盈币增加的用户,用户去重
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date=new Date();
Calendar ca = Calendar.getInstance();// 得到一个Calendar的实例
ca.setTime(date); // 设置时间为当前时间
String end=sdf.format(date);
ca.add(Calendar.DATE, -1);// 日期
Date resultDate = ca.getTime(); // 结果
String start=sdf.format(resultDate);
//start和end必须广播出去
Broadcast<String> startTime = sc.broadcast(start);
Broadcast<String> endTime = sc.broadcast(end);
Dataset<Row> activite_users_df=sqlContext.sql("select DISTINCT M_ID from fl_pay_profit_coins_serial where CREATED BETWEEN '"+startTime.value()+"' and '"+endTime.value()+"' and TYPE=1");
//注册为活跃用户表
activite_users_df.registerTempTable("activite_users");
//join,得到每个圈子中的活跃用户的数量
Dataset<Row> activite_users_per_circle_df=sqlContext.sql("select CIRCLE_ID,count(*) as circle_active_num from (select au.M_ID,fmc.CIRCLE_ID from activite_users as au left join fl_member_and_circle as fmc on au.M_ID=fmc.MID) as temp group by temp.CIRCLE_ID");
activite_users_per_circle_df.registerTempTable("activite_users_per_circle");
//与circle_user_score_df的结果进行join,得到活跃度值的表,
Dataset<Row> circle_active_rate = sqlContext.sql("select cus.MID,cus.CIRCLE_ID,cus.num_per_circle,cus.RC_ID,cus.CIRCLE_NAME,cus.CIRCLE_HEAD,aupc.circle_active_num,ROUND(aupc.circle_active_num/cus.num_per_circle,4) as active_rate from circle_user_score as cus left join activite_users_per_circle as aupc on cus.CIRCLE_ID=aupc.CIRCLE_ID");
Map<String,Object> valueMap=new HashMap<String,Object>();
valueMap.put("circle_active_num",0l);
valueMap.put("active_rate",0);
circle_active_rate=circle_active_rate.na().fill(valueMap);
circle_active_rate.registerTempTable("circle_active_rate");
//分组取topN得到我的圈子排行,按照活跃度值排行
Dataset<Row> circle_active_result = sqlContext.sql("select MID,CIRCLE_ID,num_per_circle,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate from (select MID,CIRCLE_ID,num_per_circle,RC_ID,RC_ID,CIRCLE_HEAD,circle_active_num,active_rate,row_number() over (partition by MID order by circle_active_num desc) as rank from circle_active_rate) t where t.rank<=10");
circle_active_result.registerTempTable("circle_active_result");
// (我的圈子根据活跃度排行)输出到mongodb中,
MongoSpark.write(circle_active_result).option("collection", "circle_rank_activite_pri").mode("overwrite").save();
//处理获取圈子排行榜,根据场景排序,直接第一次join的结果来开窗函数(获取圈子排行榜,按照场景分)
Dataset<Row> circle_scene_score_order_result_df=sqlContext.sql("select SCENE_TYPE,CIRCLE_ID,num_per_circle,score,RC_id,CIRCLE_NAME,CIRCLE_HEAD from (select SCENE_TYPE,CIRCLE_ID,num_per_circle,ROUND(num_per_circle/"+circle_total_people.value()+",9) as score,RC_id,CIRCLE_NAME,CIRCLE_HEAD,row_number() over (partition by SCENE_TYPE order by num_per_circle desc) as rank from circle_data_and_peoplenum ) t where t.rank<=10");
//把获取圈子排行榜的结果输出到文件中,以json格式保存,每个user输出10个
// (圈子根据等级排行,按照场景分)输出到mongodb中,
MongoSpark.write(circle_scene_score_order_result_df).option("collection", "circle_rank_grade_pub").mode("overwrite").save();
//圈子结束,社群开始,社群根据活跃度排行,
//获取群表
options.put("dbtable", "fl_crowds");
Dataset<Row> fl_crowds_df = sqlContext.read().format("jdbc").options(options).load();
//注册群表
fl_crowds_df.registerTempTable("fl_crowds");
//获取群与用户关系表
options.put("dbtable", "fl_member_and_crowd");
Dataset<Row> fl_member_and_crowd_df = sqlContext.read().format("jdbc").options(options).load();
//注册群与用户关系表
fl_member_and_crowd_df.registerTempTable("fl_member_and_crowd");
//获取每个群中的总人数
Dataset<Row> crowd_statistic_usertotal=sqlContext.sql("SELECT CROWD_ID,COUNT(*) as num_per_crowd from fl_member_and_crowd GROUP BY CROWD_ID");
//注册为临时表
crowd_statistic_usertotal.registerTempTable("crowd_statistic_usertotal");
//获取每个群中的总人数,获取每个群中的活跃用户数,群与用户关系表的两次join分别得到每个群中的总人数和每个群的活跃用户数
//第一次join,得到有活跃用户的群的活跃用户的数量与群的关系表
sqlContext.sql("select au.M_ID,mc.CROWD_ID from activite_users as au left join fl_member_and_crowd as mc on au.M_ID=mc.M_ID ").registerTempTable("crowd_activite");
Dataset<Row> crowd_statistic_activite=sqlContext.sql("select CROWD_ID,count(*) as activie_user_num from crowd_activite group by CROWD_ID");
crowd_statistic_activite.registerTempTable("crowd_statistic_activite");
//第二次join得到每个群中的总人数和活跃用户数
Dataset<Row> crowd_numuser_activiuser=sqlContext.sql("select csu.CROWD_ID,csu.num_per_crowd,csa.activie_user_num from crowd_statistic_usertotal as csu left join crowd_statistic_activite as csa on csu.CROWD_ID=csa.CROWD_ID order by activie_user_num");
Map<String,Object> valueMap2=new HashMap<String,Object>();
valueMap2.put("activie_user_num",0l);
valueMap2.put("num_per_crowd",0l);
crowd_numuser_activiuser=crowd_numuser_activiuser.na().fill(valueMap2);
crowd_numuser_activiuser.registerTempTable("crowd_numuser_activiuser");
//第三次join,得到每个群中的详细信息、每个群中的活跃人数、总人数、每个群的活跃值
Dataset<Row> crowd_df=sqlContext.sql("select fc.CROWD_LOGO,fc.RC_ID,fc.CROWD_ID,fc.CROWD_NAME,fc.CREATED_BY,cna.num_per_crowd,cna.activie_user_num,Round(cna.activie_user_num/cna.num_per_crowd,9) as crowd_active_rate from fl_crowds as fc left join crowd_numuser_activiuser as cna on fc.CROWD_ID=cna.CROWD_ID");
Map<String,Object> valueMap3=new HashMap<String,Object>();
valueMap3.put("num_per_crowd",0l);
valueMap3.put("activie_user_num",0l);
valueMap3.put("crowd_active_rate",0.0);
crowd_df=crowd_df.na().fill(valueMap3);
crowd_df.registerTempTable("crowd_df");
//第四次join,把每个用户加入的群的信息添加进来,显示群的基本信息,群中mid,每个群的活跃度评分,针对是否为用户创建后的活跃度评分
Dataset<Row> crowd_activite_base_score=sqlContext.sql("select fmc.M_ID,fmc.CROWD_ID,cd.CROWD_LOGO,cd.RC_ID,cd.CROWD_NAME,cd.CREATED_BY,cd.num_per_crowd,cd.activie_user_num,cd.crowd_active_rate,updateActiveRate(fmc.M_ID,cd.CREATED_BY,cd.crowd_active_rate) as crowd_active_rate_after from fl_member_and_crowd as fmc left join crowd_df as cd on fmc.CROWD_ID=cd.CROWD_ID ");
Map<String,Object> valueMap4=new HashMap<String,Object>();
valueMap3.put("num_per_crowd",0l);
valueMap3.put("activie_user_num",0l);
valueMap3.put("crowd_active_rate",0.0);
crowd_df=crowd_df.na().fill(valueMap3);
crowd_activite_base_score.registerTempTable("crowd_activite_base_score");
//计算所有社群的总人数
Dataset<Row> crowdSumary=sqlContext.sql("select sum(num_per_crowd) as num from crowd_activite_base_score");
JavaRDD<Row> crowdSumaryRDD=crowdSumary.javaRDD();
Long crow=(Long) crowdSumaryRDD.first().get(0);
//把总和广播出去
Broadcast<Long> crowd_total_people=sc.broadcast(crow);
//select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10
Dataset<Row> crowd_base_grade_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,getCrowdGrade(num_per_crowd," + crowd_total_people.value() + ") as grade from crowd_activite_base_score");
crowd_base_grade_df.registerTempTable("crowd_base_grade");
Dataset<Row> crowd_grade_top_df = sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,grade,row_number() over (partition by M_ID order by grade desc) as rank from crowd_base_grade) t where t.rank<=10");
MongoSpark.write(crowd_grade_top_df).option("collection", "crowd_rank_grade").mode("overwrite").save();
//分组取topN,得到社群排行
Dataset<Row> crowd_rank_result=sqlContext.sql("select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate from (select M_ID,CROWD_ID,CROWD_LOGO,RC_ID,CROWD_NAME,num_per_crowd,activie_user_num,crowd_active_rate,row_number() over (partition by M_ID order by crowd_active_rate desc) as rank from crowd_activite_base_score) t where t.rank<=10");
crowd_rank_result.registerTempTable("crowd_rank_result");
// (社群排行,根据活跃度)输出到mongodb中,
MongoSpark.write(crowd_rank_result).option("collection", "crowd_rank_active").mode("overwrite").save();
//查看圈子中最活跃的三个群
//注册全群关系表 fl_circle_crowds
options.put("dbtable", "fl_circle_crowds");
Dataset<Row> fl_circle_crowds = sqlContext.read().format("jdbc").options(options).load();
//注册群与用户关系表
fl_circle_crowds.registerTempTable("fl_circle_crowds");
//社群排行中的第四次join的结果表与群圈关系表再进行join,得到用户,圈,群,群活跃度,的大表,根据是否为本用户创建修改圈活跃度的值,在已经算好的值的基础上增加100
Dataset<Row> crowd_activite_three=sqlContext.sql("select fcc.CIRCLE_ID,fcc.CROWDS_ID,cabs.M_ID,cabs.CROWD_LOGO,cabs.RC_ID,cabs.CROWD_NAME,cabs.CREATED_BY,cabs.num_per_crowd,cabs.activie_user_num,cabs.crowd_active_rate,cabs.crowd_active_rate_after from fl_circle_crowds as fcc left join crowd_activite_base_score as cabs on fcc.CROWDS_ID=cabs.CROWD_ID ");
crowd_activite_three.registerTempTable("crowd_activite_three");
//分组取topn
Dataset<Row> crowd_activite_three_result=sqlContext.sql("select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after from (select CIRCLE_ID,CROWDS_ID,M_ID,CROWD_LOGO,RC_ID,CROWD_NAME,CREATED_BY,num_per_crowd,activie_user_num,crowd_active_rate,crowd_active_rate_after,row_number() over (partition by M_ID,CIRCLE_ID order by crowd_active_rate desc) as rank from crowd_activite_three) t where t.rank<=3");
// (社群排行,根据活跃度,活跃度前三)输出到mongodb中,
MongoSpark.write(crowd_activite_three_result).option("collection", "crowd_rank_active_three").mode("overwrite").save();
sc.stop();
/*
*
*
*
* */
}
}