郑永安
2023-06-19 7a6abd05683528032687c75e80e0bd2030a3e46c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.gkhy.safePlatform.specialWork.mq.consumer;
 
 
import com.gkhy.safePlatform.commons.enums.ResultCodes;
import com.gkhy.safePlatform.commons.exception.BusinessException;
import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils;
import com.gkhy.safePlatform.commons.utils.JsonUtils;
import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisConsumerLog;
import com.gkhy.safePlatform.specialWork.entity.WorkAnalysisRecordInfo;
import com.gkhy.safePlatform.specialWork.entity.WorkInfo;
import com.gkhy.safePlatform.specialWork.enums.WorkingAbortStatusEnum;
import com.gkhy.safePlatform.specialWork.enums.WorkingAnalysisStatusEnum;
import com.gkhy.safePlatform.specialWork.enums.WorkingStatusEnum;
import com.gkhy.safePlatform.specialWork.mq.msg.AnalysisExpireMsg;
import com.gkhy.safePlatform.specialWork.mq.service.WorkAnalysisProducerService;
import com.gkhy.safePlatform.specialWork.service.RedisService;
import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisConsumerLogService;
import com.gkhy.safePlatform.specialWork.service.baseService.WorkAnalysisRecordInfoService;
import com.gkhy.safePlatform.specialWork.service.baseService.WorkInfoService;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
 
@Service
@RocketMQMessageListener(topic = "analysisExpireTopic",consumerGroup = "analysisExpireConsumerGroup")
public class WorkAnalysisExpireConsumer implements RocketMQListener<MessageExt> {
 
    @Autowired
    private WorkAnalysisProducerService workAnalysisProducerService;
    @Autowired
    private WorkInfoService workInfoService;
    @Autowired
    private WorkAnalysisRecordInfoService workAnalysisRecordInfoService;
    @Autowired
    private RedisService redisService;
    @Autowired
    private WorkAnalysisConsumerLogService workAnalysisConsumerLogService;
 
 
    private static Logger log = LoggerFactory.getLogger(WorkAnalysisExpireConsumer.class);
 
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
 
            // 0.数据准备 和 解析
            LocalDateTime currentLocalDateTime = LocalDateTime.now();
            byte[] body = messageExt.getBody();
            String jsonString = new String(body, StandardCharsets.UTF_8);
            AnalysisExpireMsg analysisExpireMsg = JsonUtils.parse(jsonString, AnalysisExpireMsg.class);
 
            // 1.参数校验
            this.checkMessageContent(analysisExpireMsg);
            // 1.0 幂等校验
            String messageKey = analysisExpireMsg.getMessageKey();
            String ideCacheKey = "ide:consumer:" + messageKey;
            String cacheKey = redisService.getCacheKey(ideCacheKey);
            if (cacheKey != null) {
                // 缓存存在则是重复消费
                log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃");
                return;
            }
 
 
            // 穿透后校验
            if (workAnalysisConsumerLogService.countByMessageKey(analysisExpireMsg.getMessageKey()) > 0) {
                log.info("模块[特殊作业] 任务[分析过期消息重复消费] =>抛弃");
                return;
            }
 
 
            LocalDateTime expireTime = analysisExpireMsg.getExpireTime();
            // 分析过期时间到
            if (expireTime.isBefore(currentLocalDateTime) || expireTime.isEqual(currentLocalDateTime)) {
                // 1. 查找最新的一条分析记录
                WorkAnalysisRecordInfo latestRecord = workAnalysisRecordInfoService.getLatestAnalysisDataByWorkId(analysisExpireMsg.getWorkId());
                // 1.1 本次消费执行任务 对应 最新分析记录 才执行
                if (latestRecord != null && analysisExpireMsg.getAnalysisId().equals(latestRecord.getId())) {
                    //  直接过期
                    WorkInfo workInfo = workInfoService.getById(analysisExpireMsg.getWorkId());
                    if (workInfo != null && !workInfo.getWorkStatus().equals(WorkingStatusEnum.ALREADY_FINISHED.code)) {
                        // 作业结束前 需要不间断的执行分析 并且合格
                        // 根据业务逻辑 => 能执行分析过期操作的 只有分析合格的作业(作业没结束的作业)
                        workInfoService.updateWorkAnalysisStatusByWorkId(workInfo.getId(), WorkingAnalysisStatusEnum.ANALYSIS_EXPIRED);
                        // 分析超期 => 作业结束(异常结束标识为分析超期)
                        workInfoService.updateWorkStatusAndAbortStatusByWorkId(workInfo.getId(), WorkingStatusEnum.ALREADY_FINISHED, WorkingAbortStatusEnum.ANALYSIS_OVERDUE);
                        // 打印日志
                        log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{}",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId());
                    }
 
                }else{
                    log.info("模块[特殊作业] 任务[分析过期执行] 作业编号:{} 当前消费消息id:{} 非最新分析记录消息=>丢弃",analysisExpireMsg.getWorkPermitNo(),messageExt.getMsgId());
                }
 
 
            }else{
                // 还没到分析过期时间
                Duration distance = Duration.between(currentLocalDateTime, expireTime);
                // 秒
                long secondsDistance = distance.getSeconds();
                // 延时等级
                int delayLevel = CalcuteDelayLevelUtils.calcuteDelayLevel(secondsDistance);
                // 重新发送延时消息 执行
                SendResult sendResult = workAnalysisProducerService.syncSendWorkExpireMsgDelay(analysisExpireMsg, delayLevel);
                // 打log
                log.info("模块[特殊作业] 任务[分析过期消息延时投递] 延时等级:{} 作业编号:{} 当前消息id:{} 发送消息id:{}",
                        delayLevel, analysisExpireMsg.getWorkPermitNo(), messageExt.getMsgId(), sendResult.getMsgId());
            }
 
 
            // 消费记录
            // 存入缓存
            // 第一次消费 存入缓存
            redisService.setConsumerMessageCacheKeyAndExpireTime(ideCacheKey, 3L);
            WorkAnalysisConsumerLog consumerLog = new WorkAnalysisConsumerLog();
            consumerLog.setWorkId(analysisExpireMsg.getWorkId());
            consumerLog.setAnalysisId(analysisExpireMsg.getAnalysisId());
            consumerLog.setMessageKey(messageKey);
            consumerLog.setWorkPermitNo(analysisExpireMsg.getWorkPermitNo());
            consumerLog.setGmtCreate(currentLocalDateTime);
            workAnalysisConsumerLogService.saveConsumerLog(consumerLog);
        } catch (Exception e) {
            e.printStackTrace();
            // todo 消息持久化 定时补偿
            log.info("异常模块[特殊作业] 任务[分析过期消息延时投递失败] 当前消息id:{}", messageExt.getMsgId());
        }
 
 
 
    }
 
 
    // 基本参数校验
    private void checkMessageContent(AnalysisExpireMsg analysisExpireMsg) {
        if (analysisExpireMsg.getWorkId() == null ||
                analysisExpireMsg.getExpireTime() == null ||
                analysisExpireMsg.getAnalysisId() == null) {
            throw new BusinessException(ResultCodes.SERVER_PARAM_NULL.getCode(),"生产服务端入参为空");
        }
    }
}