package com.gkhy.safePlatform.specialWork.mq.consumer; import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils; import com.gkhy.safePlatform.commons.utils.JsonUtils; import com.gkhy.safePlatform.specialWork.entity.WorkApplyInfo; import com.gkhy.safePlatform.specialWork.entity.WorkApprovalUnitInfo; import com.gkhy.safePlatform.specialWork.enums.WorkApprovalStepResultEnum; import com.gkhy.safePlatform.specialWork.enums.WorkApprovalUnitResultEnum; import com.gkhy.safePlatform.specialWork.enums.WorkApplyStatusEnum; import com.gkhy.safePlatform.specialWork.mq.msg.ApplySpecialWorkMsg; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApplyInfoService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApprovalStepInfoService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApprovalUnitInfoService; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; import java.util.List; @Service @RocketMQMessageListener(topic = "${rocketmq.topic.applySpecialWorkTopic}",consumerGroup = "${rocketmq.consumers.applySpecialWorkGroup}") public class ApplySpecialWorkConsumer implements RocketMQListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${rocketmq.topic.applySpecialWorkTopic}") private String applySpecialWorkTopic; @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private WorkApplyInfoService workApplyInfoService; @Autowired private WorkApprovalStepInfoService workApprovalStepInfoService; @Autowired private WorkApprovalUnitInfoService workApprovalUnitInfoService; @Override @Transactional public void onMessage(MessageExt message) { try{ ApplySpecialWorkMsg msg = JsonUtils.parse(new String(message.getBody(), StandardCharsets.UTF_8),ApplySpecialWorkMsg.class); if(null != msg && msg.getWorkApplyId()>0){ //当前时间早于目标处理时间前0秒 Long relaySeconds = (msg.getExpStartTime().getTime() - new Date().getTime())/1000; if(relaySeconds>0){ //重发消息,实现连续延时 Message reMsg = MessageBuilder.withPayload(msg).build(); logger.info("【##】创建作业申请的消息延时"); SendResult sendResult = rocketMQTemplate.syncSend(applySpecialWorkTopic, reMsg, 10000, CalcuteDelayLevelUtils.calcuteDelayLevel(relaySeconds)); if(sendResult.getSendStatus() != SendStatus.SEND_OK){ throw new RuntimeException("【特殊作业】重发作业申请消息失败" + msg.getWorkApplyId()+ "MSG_ID: "+ message.getMsgId()); } }else{//超时 WorkApplyInfo applyInfo = workApplyInfoService.getById(msg.getWorkApplyId()); if(null != applyInfo){ //审批中作业 if(applyInfo.getStatus().equals(WorkApplyStatusEnum.STATU_IN_APPROVAL.getStatus()) && applyInfo.getApprovalStepId() != null) { List unitList = workApprovalUnitInfoService.listApprovalRuleUnitByStepId(applyInfo.getApprovalStepId()); List unitIds = new ArrayList<>(); for (WorkApprovalUnitInfo unit: unitList) { if(unit.getResult().equals(WorkApprovalUnitResultEnum.RESULT_IN_APPROVAL.getResult())){//审批中的单元 unitIds.add(unit.getId()); } } //作业超时 workApplyInfoService.updateStatusById(applyInfo.getId(), WorkApplyStatusEnum.STATU_OVER_TIME); //层级超时 workApprovalStepInfoService.updateStatusById(applyInfo.getApprovalStepId(),WorkApprovalStepResultEnum.RESULT_OVER_TIME); //单元 workApprovalUnitInfoService.updateStatusByIds(unitIds,WorkApprovalUnitResultEnum.RESULT_OVER_TIME); logger.info("workApplyId:"+msg.getWorkApplyId()+" 作业超时!"); } } } } }catch (Exception e){ logger.error("【特殊作业】审批消息消费异常!"+ "MSG_ID: "+ message.getMsgId()+e); } } }