@songhanshi
2021-01-07T17:25:14.000000Z
字数 9474
阅读 1339
复盘
背景:
针对app、微信等多种渠道的用户行为数据进行采集;将各类行为数据进行整合,与公司内线下数据、公司外数据结合;开展行为数据分析,提供相应数据分析功能和工具;提供相关业务功能和可视化展示页面。
工程中用到的技术
负责的模块
事件分析:
① 需求:
② 实现
1)查询条件:skynet提供三个接口:
1> 获取所有事件
2> 获取单个事件相关指标
3> 获取多个事件指标公共的属性
2)查询:(异步、同步)
1> 按小时维度作时间窗口,进行缓存。
2> 单个页面的查询作为1个任务,按事件指标分为多个子任务。保存子任务的查询的条件。
3> 单个子任务查询完成后,保存结果到task_info表,若存在下一个子任务,则继续调用das接口进行查询。
4> 全部子任务查询完毕后,发送消息。
5> 若可从缓存中获取结果,则同步返回,否则走异步流程。
6> 若查询是和用户相关,则返回特殊标记及子任务id,以便后续进行“查看用户列表”和“添加分群”功能。
7> 消息处理流程,暂定与漏斗分析保持一致。
3)查看用户列表:通过sql模板调用接口查询用户(同步)。
4)添加分群(同步)
③ 大数据端
1 查询引擎:impala,中间结果表存储在:htp 库。das落地中间表关键字 htp.tmp_skynet_event_analysis_d (日表) _event_analysis_,格式:stored as parquet
2 数据表:
数据存储规范:kudu实时来源表,暂定存储周期三个月。
范围:最新全量用户表,最新全量用户扩展表,离线和实时事件表
中间表:无
4模板拼接:
1)逻辑,小于3个月的数据直接查询kudu的视图表,时间跨度大于3个月且包含实时数据的查询使用,离线+当周期的视图表,不包含本周期的实时数据且查询时间大于3个月直接查询离线表。
2)涉及到交叉表的数据展示,待das和前端评审确认
5 查询性能:预估分钟级
6 数据工作:离线流程开发,模板开发
7 页面必须默认事件
数据流程图:
元数据
① 需求
1> 元数据使用流程:
2> 元数据上下游关系
② 技术方案
代码段3:
@Scheduled(cron = "0 0/5 * * * ?")
public void LogServiceUserAttrTrackSchedule() {
String key = "ssssss";
String req = UUID.randomUUID().toString();
try {
int expiration = 30 * 60;
boolean holdLock = this.distributeLock.tryLock(key, req, expiration);
if (holdLock) {
// 业务逻辑
// ....
}
} finally {
distributeLock.unlock(key, req);
}
}
package com.rong360.dataplatform.service.lock;
/**
* @author lixiaowen
* @date 2020/6/17
*/
public interface DistributeLock {
/**
* 非阻塞持有锁
*
* @param key
* @param req
* @param expiration
* @return
*/
boolean tryLock(String key,String req, int expiration);
/**
* 非阻塞请求锁
*
* @param key
* @param req
* @return
*/
boolean tryLock(String key, String req);
/**
* 阻塞请求锁
*
* @param key
* @param req
* @param expiration
* @param timeout 阻塞时长
*/
boolean tryLock(String key, String req, int expiration, int timeout);
/**
* 释放锁
*
* @param key
* @param req
* @return
*/
boolean unlock(String key,String req);
}
package com.rong360.dataplatform.service.lock;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 不可重入分布式锁的实现
*
* @author lixiaowen
* @date 2020/6/17
*/
@Slf4j
@Component
public class RedisDistributeLock implements DistributeLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 默认key过期时间,单位秒
* 5min
*/
private int defaultExpiration = 300;
/**
* 非阻塞请求锁
*
* @param key
* @param req
* @return
*/
@Override
public boolean tryLock(String key, String req) {
return tryLock(key, req, defaultExpiration);
}
/**
* 非阻塞请求锁
*
* @param key
* @param req
* @param expiration
* @return
*/
@Override
public boolean tryLock(String key, String req, int expiration) {
Boolean state = redisTemplate.opsForValue().setIfAbsent(key, req, expiration, TimeUnit.SECONDS);
if (state != null && state) {
log.info("持有分布式锁{}, req:{}:", key, req);
return true;
}
return false;
}
/**
* 阻塞请求锁
*
* @param key
* @param req
* @param expiration
* @param timeout 阻塞时长
*/
@Override
public boolean tryLock(String key, String req, int expiration, int timeout) {
long start = System.currentTimeMillis();
//毫秒
int period = 10;
for (; ; ) {
boolean lock = tryLock(key, req, expiration);
if (lock) return true;
if (System.currentTimeMillis() - start >= (timeout * 1000)) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(period);
} catch (InterruptedException e) {
return false;
}
}
return false;
}
/**
* 删除分布式锁
*
* @param key
* @param req
* @return
*/
@Override
public boolean unlock(String key, String req) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
Boolean execute = redisTemplate.execute(redisScript, Lists.newArrayList(key), req);
boolean status = execute == null ? false : execute;
if (status) {
log.debug("删除分布式键{}成功:{}", key, req);
}
return status;
}
}
new Thread() {
public void run() {
// 保存逻辑
}
}.start();
public class ThreadPoolUtil {
private static int corePoolSize = 4;
private static int maximumPoolSize = 32;
private static long keepAliveTime = 60;
private static TimeUnit unit = TimeUnit.SECONDS;
private static int maximumTask = 5000;
private static AtomicInteger tid = new AtomicInteger();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, new ArrayBlockingQueue<>(maximumTask), r -> new Thread(r,"skynet-pool-" + tid.incrementAndGet()), new ThreadPoolExecutor.AbortPolicy());
public static Future<?> submit(Runnable r) {
return executor.submit(r);
}
public static <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(1000);
}
};
submit(runnable);
TimeUnit.SECONDS.sleep(3);
}
}
代码段2:
//不阻塞返回回调结果
ThreadPoolUtil.submit(() -> {
//2. 解析data
// 找到该任务id 更新查询状态
//3.推送消息
// 创建任务相关消息
// 保存到消息表
// 监听接口:通过套接字、监听消息异步通知(发布到redis)
String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;
String msg = JSON.toJSONString(message);
log.info("推送的消息msg:{},写入topic:{},", msg, topic);
//发布到redis
redisTemplate.convertAndSend(topic, msg);
}
});
new Thread() {
public void run() {
JSONObject fadada_ret = null;
logger.info("tSign: fadada thread start sign");
fadada_ret = getJsonObject(fadada_ret, params, retObj);
if (fadada_ret == null) {
fadada_ret = JSONObject.fromObject("{\"code\":\"-1\"}");
} else {
if (fadada_ret.containsKey("casenum")
&& StringUtils.isNotBlank(fadada_ret.optString("casenum"))) {
fadada_ret.put("msg", fadada_ret.optString("casenum"));
}
}
params.putAll(fadada_ret);
String logid = NumberUtils.getFixLenthString(26);
logger.info("法大大存储时间戳wd打印传入logid:"+logid);
//保存到tsign表
JSONObject jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");
if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){
logger.warn("法大大时间戳服务盖戳保存wd返回:"+jsonObject.toString());
jsonObject = saveToWD(SAVE_TSIGN_URL+"?logid="+logid, params, "fadada");
if (jsonObject.optJSONObject("data").optInt("errorcode") !=0){
logger.warn("法大大盖戳保存wd重试失败!!失败源码:"+jsonObject.toString());
}
}
}
}.start();
//不阻塞返回回调结果
ThreadPoolUtil.submit(() -> {
//2. 解析data
TaskInfo taskInfo = getByTaskId(taskId);
if (taskInfo == null) {
log.error("{}没有找到调度任务",taskId);
return;
}
log.info("查询回调任务信息:{}", JSON.toJSONString(taskInfo));
int status = param.getCode() == 0 ? TaskInfo.TaskState.SUCCESS : TaskInfo.TaskState.FAILED;
if (status == TaskInfo.TaskState.SUCCESS) {
List<Map<String, Integer>> fd = null;
try {
fd = JSON.parseObject(param.getData(), List.class);
taskInfo.setMsg("计算成功");
taskInfo.setResult(JSON.toJSONString(fd));
} catch (Throwable e) {
log.warn("格式解析失败,data需要是list格式的json数据",e);
status = TaskInfo.TaskState.FAILED;
}
}
if (status == TaskInfo.TaskState.FAILED){
taskInfo.setMsg("计算失败");
taskInfo.setResult(param.getData());
}
taskInfo.setState(status);
saveOrUpdate(taskInfo);
String clientId = taskInfo.getCid();
//3.推送消息
if (StringUtils.isEmpty(clientId)) {
log.warn("clientId is null, can not push message to client,taskInfo:{}", JSON.toJSONString(taskInfo));
} else {
Message message = buildMessage(taskInfo);
//保存到message中
try {
messageService.save(message);
} catch (Throwable e) {
e.printStackTrace();
}
String topic = RedisWsMessageListener.TOPIC_SKYNET_WS_MSG;
String msg = JSON.toJSONString(message);
log.info("推送的消息msg:{},写入topic:{},", msg, topic);
//发布到redis
redisTemplate.convertAndSend(topic, msg);
}
});
@Scheduled(cron = "0 0/5 * * * ?")
public void LogServiceUserAttrTrackSchedule() {
String key = "skynet_metadata_event_user_attr_track_task";
String req = UUID.randomUUID().toString();
try {
int expiration = 30 * 60;
boolean holdLock = this.distributeLock.tryLock(key, req, expiration);
if (holdLock) {
log.info("本实例请求到的分布式锁,执行调度任务");
String evtUserAttrName = "event_user_attr_track";
List<EventUserAttr> eventUserAttrs = eventUserAttrService.list();
Map<String, String> param = eventUserAttrs.stream().collect(toMap(EventUserAttr::getName, EventUserAttr::getType, (s, a) -> s + ", " + a));
redisTemplate.opsForHash().putAll(evtUserAttrName, param);
log.info("用户属性名称:" + evtUserAttrName + ",存入redis," + "属性参数:" + param);
}
} finally {
distributeLock.unlock(key, req);
}
}