[关闭]
@songhanshi 2021-01-07T17:25:14.000000Z 字数 9474 阅读 1339

eww

复盘


一、用户分析

  1. 背景:
    针对app、微信等多种渠道的用户行为数据进行采集;将各类行为数据进行整合,与公司内线下数据、公司外数据结合;开展行为数据分析,提供相应数据分析功能和工具;提供相关业务功能和可视化展示页面。

  2. 工程中用到的技术

    • 总:
      ① 数据流
      简
      详细
      ② 项目名称
      data_generator: 生成测试数据(php)
      dc_openresty:lua脚本
      log_service:数据收集(重点)
      realtime-task:flink实时写入(重点)
      skynet-backend: web工程(重点)
      das-api: 数据服务平台(重点)
      zsh_docker:一键部署
      ③ 知识点:
      lua脚本、filebeat、kafka、flink、impala、kudu、springboot、docker
    • 分:
      ① 客户端app+openresty
      客户端app会将埋点数据加密后,请求dc-skynet.rong360.com下/prod/send_data接口上传数据。
      nginx运行lua脚本获取数据,进行封装,写入log。脚本参见zsh_docker工程
      ② 日志收集+log-service
      app端或者客户服务端打点数据过来后,会在日志收集系统中进行数据的简单校验和转换,同时添加用户标识id。定义转换为放入 kafka 的消息格式。
      1> 数据收集
      后台脚本不断扫描数据文件是否有新的写入 com\example\log_service\common\ThreadPoolExecutor.java
      a.生产者不断扫描文件判断是否有新数据写入,如果有新数据写入将数据读取,并推送至阻塞队列中
      b.消费者主线程不断监听阻塞队列,如果有数据将数据取出发送给消费线程池进行消费
      c.消费线程负责数据处理、保存等
      2> 日志数据的处理
      1)日志消费分为sdk日志和nginx的log日志
      2)Nginx日志消费com\example\log_service\common\NgConsumeService.java
      a.数据的decode,unzip等操作,解析出json格式数据
      b.Sdk日志消费 com\example\log_service\common\SdkConsumeService.java
      将数据直接解析成json格式数据
      c.Json格式处理 com\example\log_service\domain\impl\LogServiceImpl.java
      3)数据存储,通过log4j日志插件将日志保存至文件中代码:
      com\example\log_service\common\writelogs\LogFileOperator.java
      在这里插入图片描述
      ③ filebeat:
      数据推送到kafka。配置参考zsh_docker工程
      ④ realtime-task:
      利用flink程序消费kafka,将数据存入kudu实时表。
      ⑤ skynet:
      元数据、漏斗分析、用户分群、事件分析、后端推送消息(websocket)
      在这里插入图片描述
      ⑥ das
      1> 背景:
      数据应用层在整个系统架构中承担了业务系统对数据层的访问逻辑。为适应各种业务的变化带来的数据应用层频繁开发新接口上线问题,数据应用层需要对现有的异构数据源(包括但不限于MySQL, Redis, Impala等)进行抽象,提供出公共数据服务接口,以应对业务应用层多变的数据查询需求。
      在这里插入图片描述
      ⑦ 大数据端
      1> 数据层交互架构图
      在这里插入图片描述
      2> 数仓架构图
      在这里插入图片描述
      3> 查询方式
      kudu表当天分区增量和hive历史数据做union all 建成视图,供外提供查询:
      在这里插入图片描述
  3. 负责的模块

    • 总:skynet:
      元数据、漏斗分析、用户分群、事件分析、后端推送消息(websocket)
      1> log-sevice
      log-service需要知道收集日志的校验格式,而其所以来的元数据是在skynet一方维护,因此当skynet的元数据出现新增,删除,修改等变更之后需要将信息同步到log-service。
      目前两者之间的通信方式是基于redis。
      2>任务调度azkaban
      由于DAS目前不提供DDL 操作以及插入等功能,在用户分群模块需要将符合条件的用户圈选出来。
      目前使用SQL查询并插入的方式进行用户分群。其中会涉及到新建的情况
      azkaban调度结束后,skynet通过轮训调度任务的方式进行获取状态,再通过DAS查询出具体的结果集。
      3>消息推送websocket
      对于一些异常操作的场景,如漏斗计算,需要等待大数据计算完结果之后,将消息推送到前端,目前使用websocket的方式实现。
      4>技术栈
      Spring Boot + Mybatis-plus + Websocket + Maven
      在这里插入图片描述
    • 事件分析:
      ① 需求:
      在这里插入图片描述
      ② 实现
      1)查询条件:skynet提供三个接口:
      1> 获取所有事件
      2> 获取单个事件相关指标
      3> 获取多个事件指标公共的属性
      2)查询:(异步、同步)
      在这里插入图片描述
      1> 按小时维度作时间窗口,进行缓存。
      2> 单个页面的查询作为1个任务,按事件指标分为多个子任务。保存子任务的查询的条件。
      3> 单个子任务查询完成后,保存结果到task_info表,若存在下一个子任务,则继续调用das接口进行查询。
      4> 全部子任务查询完毕后,发送消息。
      5> 若可从缓存中获取结果,则同步返回,否则走异步流程。
      6> 若查询是和用户相关,则返回特殊标记及子任务id,以便后续进行“查看用户列表”和“添加分群”功能。
      7> 消息处理流程,暂定与漏斗分析保持一致。
      3)查看用户列表:通过sql模板调用接口查询用户(同步)。
      在这里插入图片描述
      4)添加分群(同步)
      在这里插入图片描述
      ③ 大数据端
      1 查询引擎:impala,中间结果表存储在:htp 库。das落地中间表关键字 htp.tmp_skynet_event_analysis_d (日表) _event_analysis_,格式:stored as parquet
      2 数据表:
      数据存储规范:kudu实时来源表,暂定存储周期三个月。
      范围:最新全量用户表,最新全量用户扩展表,离线和实时事件表
      中间表:无
      4模板拼接:
      1)逻辑,小于3个月的数据直接查询kudu的视图表,时间跨度大于3个月且包含实时数据的查询使用,离线+当周期的视图表,不包含本周期的实时数据且查询时间大于3个月直接查询离线表。
      2)涉及到交叉表的数据展示,待das和前端评审确认
      5 查询性能:预估分钟级
      6 数据工作:离线流程开发,模板开发
      7 页面必须默认事件
      数据流程图:
      在这里插入图片描述

    • 元数据
      ① 需求
      1> 元数据使用流程:
      在这里插入图片描述
      2> 元数据上下游关系
      在这里插入图片描述
      ② 技术方案
      在这里插入图片描述

二、技术点

Quartz

  1. Quartz
    https://www.cnblogs.com/zhanghaoliang/p/7886110.html

分布式锁

1-使用

代码段3:

  1. @Scheduled(cron = "0 0/5 * * * ?")
  2. public void LogServiceUserAttrTrackSchedule() {
  3. String key = "ssssss";
  4. String req = UUID.randomUUID().toString();
  5. try {
  6. int expiration = 30 * 60;
  7. boolean holdLock = this.distributeLock.tryLock(key, req, expiration);
  8. if (holdLock) {
  9. // 业务逻辑
  10. // ....
  11. }
  12. } finally {
  13. distributeLock.unlock(key, req);
  14. }
  15. }
2 创建接口
  1. package com.rong360.dataplatform.service.lock;
  2. /**
  3. * @author lixiaowen
  4. * @date 2020/6/17
  5. */
  6. public interface DistributeLock {
  7. /**
  8. * 非阻塞持有锁
  9. *
  10. * @param key
  11. * @param req
  12. * @param expiration
  13. * @return
  14. */
  15. boolean tryLock(String key,String req, int expiration);
  16. /**
  17. * 非阻塞请求锁
  18. *
  19. * @param key
  20. * @param req
  21. * @return
  22. */
  23. boolean tryLock(String key, String req);
  24. /**
  25. * 阻塞请求锁
  26. *
  27. * @param key
  28. * @param req
  29. * @param expiration
  30. * @param timeout 阻塞时长
  31. */
  32. boolean tryLock(String key, String req, int expiration, int timeout);
  33. /**
  34. * 释放锁
  35. *
  36. * @param key
  37. * @param req
  38. * @return
  39. */
  40. boolean unlock(String key,String req);
  41. }
3 实现接口
  1. package com.rong360.dataplatform.service.lock;
  2. import com.google.common.collect.Lists;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.core.script.DefaultRedisScript;
  7. import org.springframework.data.redis.core.script.RedisScript;
  8. import org.springframework.stereotype.Component;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * 不可重入分布式锁的实现
  12. *
  13. * @author lixiaowen
  14. * @date 2020/6/17
  15. */
  16. @Slf4j
  17. @Component
  18. public class RedisDistributeLock implements DistributeLock {
  19. @Autowired
  20. private RedisTemplate<String, String> redisTemplate;
  21. /**
  22. * 默认key过期时间,单位秒
  23. * 5min
  24. */
  25. private int defaultExpiration = 300;
  26. /**
  27. * 非阻塞请求锁
  28. *
  29. * @param key
  30. * @param req
  31. * @return
  32. */
  33. @Override
  34. public boolean tryLock(String key, String req) {
  35. return tryLock(key, req, defaultExpiration);
  36. }
  37. /**
  38. * 非阻塞请求锁
  39. *
  40. * @param key
  41. * @param req
  42. * @param expiration
  43. * @return
  44. */
  45. @Override
  46. public boolean tryLock(String key, String req, int expiration) {
  47. Boolean state = redisTemplate.opsForValue().setIfAbsent(key, req, expiration, TimeUnit.SECONDS);
  48. if (state != null && state) {
  49. log.info("持有分布式锁{}, req:{}:", key, req);
  50. return true;
  51. }
  52. return false;
  53. }
  54. /**
  55. * 阻塞请求锁
  56. *
  57. * @param key
  58. * @param req
  59. * @param expiration
  60. * @param timeout 阻塞时长
  61. */
  62. @Override
  63. public boolean tryLock(String key, String req, int expiration, int timeout) {
  64. long start = System.currentTimeMillis();
  65. //毫秒
  66. int period = 10;
  67. for (; ; ) {
  68. boolean lock = tryLock(key, req, expiration);
  69. if (lock) return true;
  70. if (System.currentTimeMillis() - start >= (timeout * 1000)) {
  71. break;
  72. }
  73. try {
  74. TimeUnit.MILLISECONDS.sleep(period);
  75. } catch (InterruptedException e) {
  76. return false;
  77. }
  78. }
  79. return false;
  80. }
  81. /**
  82. * 删除分布式锁
  83. *
  84. * @param key
  85. * @param req
  86. * @return
  87. */
  88. @Override
  89. public boolean unlock(String key, String req) {
  90. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
  91. RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
  92. Boolean execute = redisTemplate.execute(redisScript, Lists.newArrayList(key), req);
  93. boolean status = execute == null ? false : execute;
  94. if (status) {
  95. log.debug("删除分布式键{}成功:{}", key, req);
  96. }
  97. return status;
  98. }
  99. }

线程

单线程
  1. /tSign
    代码段1:
  1. new Thread() {
  2. public void run() {
  3. // 保存逻辑
  4. }
  5. }.start();
线程池
1-创建设置
  1. public class ThreadPoolUtil {
  2. private static int corePoolSize = 4;
  3. private static int maximumPoolSize = 32;
  4. private static long keepAliveTime = 60;
  5. private static TimeUnit unit = TimeUnit.SECONDS;
  6. private static int maximumTask = 5000;
  7. private static AtomicInteger tid = new AtomicInteger();
  8. private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
  9. unit, new ArrayBlockingQueue<>(maximumTask), r -> new Thread(r,"skynet-pool-" + tid.incrementAndGet()), new ThreadPoolExecutor.AbortPolicy());
  10. public static Future<?> submit(Runnable r) {
  11. return executor.submit(r);
  12. }
  13. public static <T> Future<T> submit(Callable<T> task) {
  14. return executor.submit(task);
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. Runnable runnable = new Runnable() {
  18. @Override
  19. public void run() {
  20. System.out.println(1000);
  21. }
  22. };
  23. submit(runnable);
  24. TimeUnit.SECONDS.sleep(3);
  25. }
  26. }
2-使用

代码段2:

  1. //不阻塞返回回调结果
  2. ThreadPoolUtil.submit(() -> {
  3. //2. 解析data
  4. // 找到该任务id 更新查询状态
  5. //3.推送消息
  6. // 创建任务相关消息
  7. // 保存到消息表
  8. // 监听接口:通过套接字、监听消息异步通知(发布到redis)
  9. String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;
  10. String msg = JSON.toJSONString(message);
  11. log.info("推送的消息msg:{},写入topic:{},", msg, topic);
  12. //发布到redis
  13. redisTemplate.convertAndSend(topic, msg);
  14. }
  15. });

代码段

代码段1
  1. new Thread() {
  2. public void run() {
  3. JSONObject fadada_ret = null;
  4. logger.info("tSign: fadada thread start sign");
  5. fadada_ret = getJsonObject(fadada_ret, params, retObj);
  6. if (fadada_ret == null) {
  7. fadada_ret = JSONObject.fromObject("{\"code\":\"-1\"}");
  8. } else {
  9. if (fadada_ret.containsKey("casenum")
  10. && StringUtils.isNotBlank(fadada_ret.optString("casenum"))) {
  11. fadada_ret.put("msg", fadada_ret.optString("casenum"));
  12. }
  13. }
  14. params.putAll(fadada_ret);
  15. String logid = NumberUtils.getFixLenthString(26);
  16. logger.info("法大大存储时间戳wd打印传入logid:"+logid);
  17. //保存到tsign表
  18. JSONObject jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");
  19. if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){
  20. logger.warn("法大大时间戳服务盖戳保存wd返回:"+jsonObject.toString());
  21. jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");
  22. if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){
  23. logger.warn("法大大盖戳保存wd重试失败!!失败源码:"+jsonObject.toString());
  24. }
  25. }
  26. }
  27. }.start();
代码段2
  1. //不阻塞返回回调结果
  2. ThreadPoolUtil.submit(() -> {
  3. //2. 解析data
  4. TaskInfo taskInfo = getByTaskId(taskId);
  5. if (taskInfo == null) {
  6. log.error("{}没有找到调度任务",taskId);
  7. return;
  8. }
  9. log.info("查询回调任务信息:{}", JSON.toJSONString(taskInfo));
  10. int status = param.getCode() == 0 ? TaskInfo.TaskState.SUCCESS : TaskInfo.TaskState.FAILED;
  11. if (status == TaskInfo.TaskState.SUCCESS) {
  12. List<Map<String, Integer>> fd = null;
  13. try {
  14. fd = JSON.parseObject(param.getData(), List.class);
  15. taskInfo.setMsg("计算成功");
  16. taskInfo.setResult(JSON.toJSONString(fd));
  17. } catch (Throwable e) {
  18. log.warn("格式解析失败,data需要是list格式的json数据",e);
  19. status = TaskInfo.TaskState.FAILED;
  20. }
  21. }
  22. if (status == TaskInfo.TaskState.FAILED){
  23. taskInfo.setMsg("计算失败");
  24. taskInfo.setResult(param.getData());
  25. }
  26. taskInfo.setState(status);
  27. saveOrUpdate(taskInfo);
  28. String clientId = taskInfo.getCid();
  29. //3.推送消息
  30. if (StringUtils.isEmpty(clientId)) {
  31. log.warn("clientId is null, can not push message to client,taskInfo:{}", JSON.toJSONString(taskInfo));
  32. } else {
  33. Message message = buildMessage(taskInfo);
  34. //保存到message中
  35. try {
  36. messageService.save(message);
  37. } catch (Throwable e) {
  38. e.printStackTrace();
  39. }
  40. String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;
  41. String msg = JSON.toJSONString(message);
  42. log.info("推送的消息msg:{},写入topic:{},", msg, topic);
  43. //发布到redis
  44. redisTemplate.convertAndSend(topic, msg);
  45. }
  46. });
代码段3
  1. @Scheduled(cron = "0 0/5 * * * ?")
  2. public void LogServiceUserAttrTrackSchedule() {
  3. String key = "skynet_metadata_event_user_attr_track_task";
  4. String req = UUID.randomUUID().toString();
  5. try {
  6. int expiration = 30 * 60;
  7. boolean holdLock = this.distributeLock.tryLock(key, req, expiration);
  8. if (holdLock) {
  9. log.info("本实例请求到的分布式锁,执行调度任务");
  10. String evtUserAttrName = "event_user_attr_track";
  11. List<EventUserAttr> eventUserAttrs = eventUserAttrService.list();
  12. Map<String, String> param = eventUserAttrs.stream().collect(toMap(EventUserAttr::getName, EventUserAttr::getType, (s, a) -> s + ", " + a));
  13. redisTemplate.opsForHash().putAll(evtUserAttrName, param);
  14. log.info("用户属性名称:" + evtUserAttrName + ",存入redis," + "属性参数:" + param);
  15. }
  16. } finally {
  17. distributeLock.unlock(key, req);
  18. }
  19. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注