package com.gkhy.safePlatform.specialWork.mq.consumer;
|
|
|
import com.gkhy.safePlatform.commons.enums.ResultCodes;
|
import com.gkhy.safePlatform.commons.exception.BusinessException;
|
import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils;
|
import com.gkhy.safePlatform.commons.utils.JsonUtils;
|
import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisConsumerLog;
|
import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisRecordInfo;
|
import com.gkhy.safePlatform.specialWork.entity.WorkInfo;
|
import com.gkhy.safePlatform.specialWork.enums.WorkingAbortStatusEnum;
|
import com.gkhy.safePlatform.specialWork.enums.WorkingAnalysisStatusEnum;
|
import com.gkhy.safePlatform.specialWork.enums.WorkingStatusEnum;
|
import com.gkhy.safePlatform.specialWork.mq.msg.AnalysisExpireMsg;
|
import com.gkhy.safePlatform.specialWork.mq.service.WorkAnalysisProducerService;
|
import com.gkhy.safePlatform.specialWork.service.RedisService;
|
import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisConsumerLogService;
|
import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisRecordInfoService;
|
import com.gkhy.safePlatform.specialWork.service.baseService.WorkInfoService;
|
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.common.message.MessageExt;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.nio.charset.StandardCharsets;
|
import java.time.Duration;
|
import java.time.LocalDateTime;
|
|
@Service
|
@RocketMQMessageListener(topic = "analysisExpireTopic",consumerGroup = "analysisExpireConsumerGroup")
|
public class WorkAnalysisExpireConsumer implements RocketMQListener<MessageExt> {
|
|
@Autowired
|
private WorkAnalysisProducerService workAnalysisProducerService;
|
@Autowired
|
private WorkInfoService workInfoService;
|
@Autowired
|
private WorkAnalysisRecordInfoService workAnalysisRecordInfoService;
|
@Autowired
|
private RedisService redisService;
|
@Autowired
|
private WorkAnalysisConsumerLogService workAnalysisConsumerLogService;
|
|
|
private static Logger log = LoggerFactory.getLogger(WorkAnalysisExpireConsumer.class);
|
|
@Override
|
public void onMessage(MessageExt messageExt) {
|
try {
|
|
// 0.数据准备 和 解析
|
LocalDateTime currentLocalDateTime = LocalDateTime.now();
|
byte[] body = messageExt.getBody();
|
String jsonString = new String(body, StandardCharsets.UTF_8);
|
AnalysisExpireMsg analysisExpireMsg = JsonUtils.parse(jsonString, AnalysisExpireMsg.class);
|
|
// 1.参数校验
|
this.checkMessageContent(analysisExpireMsg);
|
// 1.0 幂等校验
|
String messageKey = analysisExpireMsg.getMessageKey();
|
String ideCacheKey = "ide:consumer:" + messageKey;
|
String cacheKey = redisService.getCacheKey(ideCacheKey);
|
if (cacheKey != null) {
|
// 缓存存在则是重复消费
|
log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃");
|
return;
|
}
|
|
|
// 穿透后校验
|
if (workAnalysisConsumerLogService.countByMessageKey(analysisExpireMsg.getMessageKey()) > 0) {
|
log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃");
|
return;
|
}
|
|
|
LocalDateTime expireTime = analysisExpireMsg.getExpireTime();
|
// 分析过期时间到
|
if (expireTime.isBefore(currentLocalDateTime) || expireTime.isEqual(currentLocalDateTime)) {
|
// 1. 查找最新的一条分析记录
|
WorkAnalysisRecordInfo latestRecord = workAnalysisRecordInfoService.getLatestAnalysisDataByWorkId(analysisExpireMsg.getWorkId());
|
// 1.1 本次消费执行任务 对应 最新分析记录 才执行
|
if (latestRecord != null && analysisExpireMsg.getAnalysisId().equals(latestRecord.getId())) {
|
// 直接过期
|
WorkInfo workInfo = workInfoService.getById(analysisExpireMsg.getWorkId());
|
if (workInfo != null && !workInfo.getWorkStatus().equals(WorkingStatusEnum.ALREADY_FINISHED.code)) {
|
// 作业结束前 需要不间断的执行分析 并且合格
|
// 根据业务逻辑 => 能执行分析过期操作的 只有分析合格的作业(作业没结束的作业)
|
workInfoService.updateWorkAnalysisStatusByWorkId(workInfo.getId(), WorkingAnalysisStatusEnum.ANALYSIS_EXPIRED);
|
// 分析超期 => 作业结束(异常结束标识为分析超期)
|
workInfoService.updateWorkStatusAndAbortStatusByWorkId(workInfo.getId(), WorkingStatusEnum.ALREADY_FINISHED, WorkingAbortStatusEnum.ANALYSIS_OVERDUE);
|
// 打印日志
|
log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{}",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId());
|
}
|
|
}else{
|
log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{} 非最新分析记录消息=>丢弃",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId());
|
}
|
|
|
}else{
|
// 还没到分析过期时间
|
Duration distance = Duration.between(currentLocalDateTime, expireTime);
|
// 秒
|
long secondsDistance = distance.getSeconds();
|
// 延时等级
|
int delayLevel = CalcuteDelayLevelUtils.calcuteDelayLevel(secondsDistance);
|
// 重新发送延时消息 执行
|
SendResult sendResult = workAnalysisProducerService.syncSendWorkExpireMsgDelay(analysisExpireMsg, delayLevel);
|
// 打log
|
log.info("模块[特殊作业] 任务[分析过期消息延时投递] 延时等级:{} 作业编号:{} 当前消息id:{} 发送消息id:{}",
|
delayLevel, analysisExpireMsg.getWorkPermitNo(), messageExt.getMsgId(), sendResult.getMsgId());
|
}
|
|
|
// 消费记录
|
// 存入缓存
|
// 第一次消费 存入缓存
|
redisService.setConsumerMessageCacheKeyAndExpireTime(ideCacheKey, 3L);
|
WorkAnalysisConsumerLog consumerLog = new WorkAnalysisConsumerLog();
|
consumerLog.setWorkId(analysisExpireMsg.getWorkId());
|
consumerLog.setAnalysisId(analysisExpireMsg.getAnalysisId());
|
consumerLog.setMessageKey(messageKey);
|
consumerLog.setWorkPermitNo(analysisExpireMsg.getWorkPermitNo());
|
consumerLog.setGmtCreate(currentLocalDateTime);
|
workAnalysisConsumerLogService.saveConsumerLog(consumerLog);
|
} catch (Exception e) {
|
e.printStackTrace();
|
// todo 消息持久化 定时补偿
|
log.info("异常模块[特殊作业] 任务[分析过期消息延时投递失败] 当前消息id:{}", messageExt.getMsgId());
|
}
|
|
|
|
}
|
|
|
// 基本参数校验
|
private void checkMessageContent(AnalysisExpireMsg analysisExpireMsg) {
|
if (analysisExpireMsg.getWorkId() == null ||
|
analysisExpireMsg.getExpireTime() == null ||
|
analysisExpireMsg.getAnalysisId() == null) {
|
throw new BusinessException(ResultCodes.SERVER_PARAM_NULL.getCode(),"生产服务端入参为空");
|
}
|
}
|
}
|