@songhanshi
2021-01-07T17:25:14.000000Z
字数 9474
阅读 1417
复盘
背景:
针对app、微信等多种渠道的用户行为数据进行采集;将各类行为数据进行整合,与公司内线下数据、公司外数据结合;开展行为数据分析,提供相应数据分析功能和工具;提供相关业务功能和可视化展示页面。
工程中用到的技术

负责的模块

事件分析:
① 需求:
② 实现
1)查询条件:skynet提供三个接口:
1> 获取所有事件
2> 获取单个事件相关指标
3> 获取多个事件指标公共的属性
2)查询:(异步、同步)
1> 按小时维度作时间窗口,进行缓存。
2> 单个页面的查询作为1个任务,按事件指标分为多个子任务。保存子任务的查询的条件。
3> 单个子任务查询完成后,保存结果到task_info表,若存在下一个子任务,则继续调用das接口进行查询。
4> 全部子任务查询完毕后,发送消息。
5> 若可从缓存中获取结果,则同步返回,否则走异步流程。
6> 若查询是和用户相关,则返回特殊标记及子任务id,以便后续进行“查看用户列表”和“添加分群”功能。
7> 消息处理流程,暂定与漏斗分析保持一致。
3)查看用户列表:通过sql模板调用接口查询用户(同步)。
4)添加分群(同步)
③ 大数据端
1 查询引擎:impala,中间结果表存储在:htp 库。das落地中间表关键字 htp.tmp_skynet_event_analysis_d (日表) _event_analysis_,格式:stored as parquet
2 数据表:
数据存储规范:kudu实时来源表,暂定存储周期三个月。
范围:最新全量用户表,最新全量用户扩展表,离线和实时事件表
中间表:无
4模板拼接:
1)逻辑,小于3个月的数据直接查询kudu的视图表,时间跨度大于3个月且包含实时数据的查询使用,离线+当周期的视图表,不包含本周期的实时数据且查询时间大于3个月直接查询离线表。
2)涉及到交叉表的数据展示,待das和前端评审确认
5 查询性能:预估分钟级
6 数据工作:离线流程开发,模板开发
7 页面必须默认事件
数据流程图:

元数据
① 需求
1> 元数据使用流程:
2> 元数据上下游关系
② 技术方案

代码段3:
@Scheduled(cron = "0 0/5 * * * ?")public void LogServiceUserAttrTrackSchedule() {String key = "ssssss";String req = UUID.randomUUID().toString();try {int expiration = 30 * 60;boolean holdLock = this.distributeLock.tryLock(key, req, expiration);if (holdLock) {// 业务逻辑// ....}} finally {distributeLock.unlock(key, req);}}
package com.rong360.dataplatform.service.lock;/*** @author lixiaowen* @date 2020/6/17*/public interface DistributeLock {/*** 非阻塞持有锁** @param key* @param req* @param expiration* @return*/boolean tryLock(String key,String req, int expiration);/*** 非阻塞请求锁** @param key* @param req* @return*/boolean tryLock(String key, String req);/*** 阻塞请求锁** @param key* @param req* @param expiration* @param timeout 阻塞时长*/boolean tryLock(String key, String req, int expiration, int timeout);/*** 释放锁** @param key* @param req* @return*/boolean unlock(String key,String req);}
package com.rong360.dataplatform.service.lock;import com.google.common.collect.Lists;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.data.redis.core.script.RedisScript;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** 不可重入分布式锁的实现** @author lixiaowen* @date 2020/6/17*/@Slf4j@Componentpublic class RedisDistributeLock implements DistributeLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 默认key过期时间,单位秒* 5min*/private int defaultExpiration = 300;/*** 非阻塞请求锁** @param key* @param req* @return*/@Overridepublic boolean tryLock(String key, String req) {return tryLock(key, req, defaultExpiration);}/*** 非阻塞请求锁** @param key* @param req* @param expiration* @return*/@Overridepublic boolean tryLock(String key, String req, int expiration) {Boolean state = redisTemplate.opsForValue().setIfAbsent(key, req, expiration, TimeUnit.SECONDS);if (state != null && state) {log.info("持有分布式锁{}, req:{}:", key, req);return true;}return false;}/*** 阻塞请求锁** @param key* @param req* @param expiration* @param timeout 阻塞时长*/@Overridepublic boolean tryLock(String key, String req, int expiration, int timeout) {long start = System.currentTimeMillis();//毫秒int period = 10;for (; ; ) {boolean lock = tryLock(key, req, expiration);if (lock) return true;if (System.currentTimeMillis() - start >= (timeout * 1000)) {break;}try {TimeUnit.MILLISECONDS.sleep(period);} catch (InterruptedException e) {return false;}}return false;}/*** 删除分布式锁** @param key* @param req* @return*/@Overridepublic boolean unlock(String key, String req) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);Boolean execute = redisTemplate.execute(redisScript, Lists.newArrayList(key), req);boolean status = execute == null ? false : execute;if (status) {log.debug("删除分布式键{}成功:{}", key, req);}return status;}}
new Thread() {public void run() {// 保存逻辑}}.start();
public class ThreadPoolUtil {private static int corePoolSize = 4;private static int maximumPoolSize = 32;private static long keepAliveTime = 60;private static TimeUnit unit = TimeUnit.SECONDS;private static int maximumTask = 5000;private static AtomicInteger tid = new AtomicInteger();private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,unit, new ArrayBlockingQueue<>(maximumTask), r -> new Thread(r,"skynet-pool-" + tid.incrementAndGet()), new ThreadPoolExecutor.AbortPolicy());public static Future<?> submit(Runnable r) {return executor.submit(r);}public static <T> Future<T> submit(Callable<T> task) {return executor.submit(task);}public static void main(String[] args) throws InterruptedException {Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(1000);}};submit(runnable);TimeUnit.SECONDS.sleep(3);}}
代码段2:
//不阻塞返回回调结果ThreadPoolUtil.submit(() -> {//2. 解析data// 找到该任务id 更新查询状态//3.推送消息// 创建任务相关消息// 保存到消息表// 监听接口:通过套接字、监听消息异步通知(发布到redis)String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;String msg = JSON.toJSONString(message);log.info("推送的消息msg:{},写入topic:{},", msg, topic);//发布到redisredisTemplate.convertAndSend(topic, msg);}});
new Thread() {public void run() {JSONObject fadada_ret = null;logger.info("tSign: fadada thread start sign");fadada_ret = getJsonObject(fadada_ret, params, retObj);if (fadada_ret == null) {fadada_ret = JSONObject.fromObject("{\"code\":\"-1\"}");} else {if (fadada_ret.containsKey("casenum")&& StringUtils.isNotBlank(fadada_ret.optString("casenum"))) {fadada_ret.put("msg", fadada_ret.optString("casenum"));}}params.putAll(fadada_ret);String logid = NumberUtils.getFixLenthString(26);logger.info("法大大存储时间戳wd打印传入logid:"+logid);//保存到tsign表JSONObject jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){logger.warn("法大大时间戳服务盖戳保存wd返回:"+jsonObject.toString());jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){logger.warn("法大大盖戳保存wd重试失败!!失败源码:"+jsonObject.toString());}}}}.start();
//不阻塞返回回调结果ThreadPoolUtil.submit(() -> {//2. 解析dataTaskInfo taskInfo = getByTaskId(taskId);if (taskInfo == null) {log.error("{}没有找到调度任务",taskId);return;}log.info("查询回调任务信息:{}", JSON.toJSONString(taskInfo));int status = param.getCode() == 0 ? TaskInfo.TaskState.SUCCESS : TaskInfo.TaskState.FAILED;if (status == TaskInfo.TaskState.SUCCESS) {List<Map<String, Integer>> fd = null;try {fd = JSON.parseObject(param.getData(), List.class);taskInfo.setMsg("计算成功");taskInfo.setResult(JSON.toJSONString(fd));} catch (Throwable e) {log.warn("格式解析失败,data需要是list格式的json数据",e);status = TaskInfo.TaskState.FAILED;}}if (status == TaskInfo.TaskState.FAILED){taskInfo.setMsg("计算失败");taskInfo.setResult(param.getData());}taskInfo.setState(status);saveOrUpdate(taskInfo);String clientId = taskInfo.getCid();//3.推送消息if (StringUtils.isEmpty(clientId)) {log.warn("clientId is null, can not push message to client,taskInfo:{}", JSON.toJSONString(taskInfo));} else {Message message = buildMessage(taskInfo);//保存到message中try {messageService.save(message);} catch (Throwable e) {e.printStackTrace();}String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;String msg = JSON.toJSONString(message);log.info("推送的消息msg:{},写入topic:{},", msg, topic);//发布到redisredisTemplate.convertAndSend(topic, msg);}});
@Scheduled(cron = "0 0/5 * * * ?")public void LogServiceUserAttrTrackSchedule() {String key = "skynet_metadata_event_user_attr_track_task";String req = UUID.randomUUID().toString();try {int expiration = 30 * 60;boolean holdLock = this.distributeLock.tryLock(key, req, expiration);if (holdLock) {log.info("本实例请求到的分布式锁,执行调度任务");String evtUserAttrName = "event_user_attr_track";List<EventUserAttr> eventUserAttrs = eventUserAttrService.list();Map<String, String> param = eventUserAttrs.stream().collect(toMap(EventUserAttr::getName, EventUserAttr::getType, (s, a) -> s + ", " + a));redisTemplate.opsForHash().putAll(evtUserAttrName, param);log.info("用户属性名称:" + evtUserAttrName + ",存入redis," + "属性参数:" + param);}} finally {distributeLock.unlock(key, req);}}