package com.gkhy.safePlatform.specialWork.mq.consumer; import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils; import com.gkhy.safePlatform.commons.utils.JsonUtils; import com.gkhy.safePlatform.specialWork.entity.WorkApplyInfo; import com.gkhy.safePlatform.specialWork.entity.WorkApprovalUnitInfo; import com.gkhy.safePlatform.specialWork.enums.WorkApprovalStepResultEnum; import com.gkhy.safePlatform.specialWork.enums.WorkApprovalUnitResultEnum; import com.gkhy.safePlatform.specialWork.enums.WorkApplyStatusEnum; import com.gkhy.safePlatform.specialWork.mq.msg.ApprovalSpecialWorkMsg; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApplyInfoService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApprovalStepInfoService; import com.gkhy.safePlatform.specialWork.service.baseService.WorkApprovalUnitInfoService; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; import java.util.List; @Service @RocketMQMessageListener(topic = "${rocketmq.topic.approvalSpecialWorkTopic}",consumerGroup = "${rocketmq.consumers.approvalSpecialWorkGroup}") public class ApprovalSpecialWorkConsumer implements RocketMQListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${rocketmq.topic.approvalSpecialWorkTopic}") private String approvalSpecialWorkTopic; @Autowired private WorkApplyInfoService workApplyInfoService; @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private WorkApprovalStepInfoService workApprovalStepInfoService; @Autowired private WorkApprovalUnitInfoService workApprovalUnitInfoService; @Override @Transactional public void onMessage(MessageExt message) { try { ApprovalSpecialWorkMsg msg = JsonUtils.parse(new String(message.getBody(), StandardCharsets.UTF_8), ApprovalSpecialWorkMsg.class); if(null != msg && msg.getWorkApplyId()>0){ //当前时间早于目标处理时间前0秒 Long relaySeconds = (msg.getApprovalEffectiveTime().getTime() - new Date().getTime())/1000; if(relaySeconds>0){ //重发消息,实现连续延时 Message reMsg = MessageBuilder.withPayload(msg).build(); logger.info("【##】创建分析人审批的消息延时"+ "MSG_ID: "+ message.getMsgId()); SendResult sendResult = rocketMQTemplate.syncSend(approvalSpecialWorkTopic, reMsg, 10000, CalcuteDelayLevelUtils.calcuteDelayLevel(relaySeconds)); if(sendResult.getSendStatus() != SendStatus.SEND_OK){ throw new RuntimeException("【特殊作业】重新发送分析人审批消息失败" + msg.getWorkApplyId() + "MSG_ID: "+ message.getMsgId()); } }else{//分析人审批有效时长过期 WorkApplyInfo applyInfo = workApplyInfoService.getById(msg.getWorkApplyId()); if(null != applyInfo){ //审批中作业 if(applyInfo.getStatus().equals(WorkApplyStatusEnum.STATU_IN_APPROVAL.getStatus()) && applyInfo.getApprovalStepId() != null) { List unitList = workApprovalUnitInfoService.listApprovalRuleUnitByStepId(applyInfo.getApprovalStepId()); List unitIds = new ArrayList<>(); for (WorkApprovalUnitInfo unit: unitList) { if(unit.getResult().equals(WorkApprovalUnitResultEnum.RESULT_IN_APPROVAL.getResult())){//审批中的单元 unitIds.add(unit.getId()); } } //作业 workApplyInfoService.updateStatusById(applyInfo.getId(), WorkApplyStatusEnum.STATU_EXPIRATION_DATE); //层级 workApprovalStepInfoService.updateStatusById(applyInfo.getApprovalStepId(),WorkApprovalStepResultEnum.RESULT_EXPIRATION_DATE); //单元 workApprovalUnitInfoService.updateStatusByIds(unitIds,WorkApprovalUnitResultEnum.RESULT_EXPIRATION_DATE); logger.info("workApplyId:"+msg.getWorkApplyId()+" 分析人审批有效时长过期"); } } } } } catch (Exception e){ logger.error("【特殊作业】申请消息消费异常!"+ "MSG_ID: "+ message.getMsgId()+e); } } }