package com.gkhy.safePlatform.specialWork.mq.consumer; import com.gkhy.safePlatform.commons.enums.ResultCodes; import com.gkhy.safePlatform.commons.exception.BusinessException; import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils; import com.gkhy.safePlatform.commons.utils.JsonUtils; import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisConsumerLog; import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisRecordInfo; import com.gkhy.safePlatform.specialWork.entity.WorkInfo; import com.gkhy.safePlatform.specialWork.enums.WorkingAbortStatusEnum; import com.gkhy.safePlatform.specialWork.enums.WorkingAnalysisStatusEnum; import com.gkhy.safePlatform.specialWork.enums.WorkingStatusEnum; import com.gkhy.safePlatform.specialWork.mq.msg.AnalysisExpireMsg; import com.gkhy.safePlatform.specialWork.mq.service.WorkAnalysisProducerService; import com.gkhy.safePlatform.specialWork.service.RedisService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisConsumerLogService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisRecordInfoService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkInfoService; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; @Service @RocketMQMessageListener(topic = "analysisExpireTopic",consumerGroup = "analysisExpireConsumerGroup") public class WorkAnalysisExpireConsumer implements RocketMQListener { @Autowired private WorkAnalysisProducerService workAnalysisProducerService; @Autowired private WorkInfoService workInfoService; @Autowired private WorkAnalysisRecordInfoService workAnalysisRecordInfoService; @Autowired private RedisService redisService; @Autowired private WorkAnalysisConsumerLogService workAnalysisConsumerLogService; private static Logger log = LoggerFactory.getLogger(WorkAnalysisExpireConsumer.class); @Override public void onMessage(MessageExt messageExt) { try { // 0.数据准备 和 解析 LocalDateTime currentLocalDateTime = LocalDateTime.now(); byte[] body = messageExt.getBody(); String jsonString = new String(body, StandardCharsets.UTF_8); AnalysisExpireMsg analysisExpireMsg = JsonUtils.parse(jsonString, AnalysisExpireMsg.class); // 1.参数校验 this.checkMessageContent(analysisExpireMsg); // 1.0 幂等校验 String messageKey = analysisExpireMsg.getMessageKey(); String ideCacheKey = "ide:consumer:" + messageKey; String cacheKey = redisService.getCacheKey(ideCacheKey); if (cacheKey != null) { // 缓存存在则是重复消费 log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃"); return; } // 穿透后校验 if (workAnalysisConsumerLogService.countByMessageKey(analysisExpireMsg.getMessageKey()) > 0) { log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃"); return; } LocalDateTime expireTime = analysisExpireMsg.getExpireTime(); // 分析过期时间到 if (expireTime.isBefore(currentLocalDateTime) || expireTime.isEqual(currentLocalDateTime)) { // 1. 查找最新的一条分析记录 WorkAnalysisRecordInfo latestRecord = workAnalysisRecordInfoService.getLatestAnalysisDataByWorkId(analysisExpireMsg.getWorkId()); // 1.1 本次消费执行任务 对应 最新分析记录 才执行 if (latestRecord != null && analysisExpireMsg.getAnalysisId().equals(latestRecord.getId())) { // 直接过期 WorkInfo workInfo = workInfoService.getById(analysisExpireMsg.getWorkId()); if (workInfo != null && !workInfo.getWorkStatus().equals(WorkingStatusEnum.ALREADY_FINISHED.code)) { // 作业结束前 需要不间断的执行分析 并且合格 // 根据业务逻辑 => 能执行分析过期操作的 只有分析合格的作业(作业没结束的作业) workInfoService.updateWorkAnalysisStatusByWorkId(workInfo.getId(), WorkingAnalysisStatusEnum.ANALYSIS_EXPIRED); // 分析超期 => 作业结束(异常结束标识为分析超期) workInfoService.updateWorkStatusAndAbortStatusByWorkId(workInfo.getId(), WorkingStatusEnum.ALREADY_FINISHED, WorkingAbortStatusEnum.ANALYSIS_OVERDUE); // 打印日志 log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{}",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId()); } }else{ log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{} 非最新分析记录消息=>丢弃",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId()); } }else{ // 还没到分析过期时间 Duration distance = Duration.between(currentLocalDateTime, expireTime); // 秒 long secondsDistance = distance.getSeconds(); // 延时等级 int delayLevel = CalcuteDelayLevelUtils.calcuteDelayLevel(secondsDistance); // 重新发送延时消息 执行 SendResult sendResult = workAnalysisProducerService.syncSendWorkExpireMsgDelay(analysisExpireMsg, delayLevel); // 打log log.info("模块[特殊作业] 任务[分析过期消息延时投递] 延时等级:{} 作业编号:{} 当前消息id:{} 发送消息id:{}", delayLevel, analysisExpireMsg.getWorkPermitNo(), messageExt.getMsgId(), sendResult.getMsgId()); } // 消费记录 // 存入缓存 // 第一次消费 存入缓存 redisService.setConsumerMessageCacheKeyAndExpireTime(ideCacheKey, 3L); WorkAnalysisConsumerLog consumerLog = new WorkAnalysisConsumerLog(); consumerLog.setWorkId(analysisExpireMsg.getWorkId()); consumerLog.setAnalysisId(analysisExpireMsg.getAnalysisId()); consumerLog.setMessageKey(messageKey); consumerLog.setWorkPermitNo(analysisExpireMsg.getWorkPermitNo()); consumerLog.setGmtCreate(currentLocalDateTime); workAnalysisConsumerLogService.saveConsumerLog(consumerLog); } catch (Exception e) { e.printStackTrace(); // todo 消息持久化 定时补偿 log.info("异常模块[特殊作业] 任务[分析过期消息延时投递失败] 当前消息id:{}", messageExt.getMsgId()); } } // 基本参数校验 private void checkMessageContent(AnalysisExpireMsg analysisExpireMsg) { if (analysisExpireMsg.getWorkId() == null || analysisExpireMsg.getExpireTime() == null || analysisExpireMsg.getAnalysisId() == null) { throw new BusinessException(ResultCodes.SERVER_PARAM_NULL.getCode(),"生产服务端入参为空"); } } }