package com.gkhy.safePlatform.equipment.mq.consumer; import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils; import com.gkhy.safePlatform.commons.utils.JsonUtils; import com.gkhy.safePlatform.equipment.entity.SafeMaterialDetailInfo; import com.gkhy.safePlatform.equipment.enums.ValidStatusEnum; import com.gkhy.safePlatform.equipment.mq.msg.SafeMaterialMsg; import com.gkhy.safePlatform.equipment.service.baseService.SafeMaterialDetailInfoService; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.nio.charset.StandardCharsets; import java.util.Date; @Service @RocketMQMessageListener(topic = "${rocketmq.topic.safeMaterialTopic}",consumerGroup = "${rocketmq.consumer.safeMaterialGroup}") public class SafeMaterialConsumer implements RocketMQListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${rocketmq.topic.safeMaterialTopic}") private String safeMaterialTopic; @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private SafeMaterialDetailInfoService safeMaterialDetailInfoService; @Override @Transactional public void onMessage(MessageExt message) { try{ SafeMaterialMsg msg = JsonUtils.parse(new String(message.getBody(), StandardCharsets.UTF_8),SafeMaterialMsg.class); if(null != msg && msg.getId()>0){ //当前时间早于目标处理时间前0秒 Long relaySeconds = (msg.getValidTime().getTime() - new Date().getTime())/1000; if(relaySeconds>0){ //重发消息,实现连续延时 Message reMsg = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSend(safeMaterialTopic, reMsg, 10000, CalcuteDelayLevelUtils.calcuteDelayLevel(relaySeconds)); if(sendResult.getSendStatus() != SendStatus.SEND_OK){ throw new RuntimeException("【安全物资管理】重发消息失败" + msg.getId()+ "MSG_ID: "+ message.getMsgId()); } }else{//超时 SafeMaterialDetailInfo materialDetailInfo = safeMaterialDetailInfoService.queryById(msg.getId()); if(null != materialDetailInfo){ safeMaterialDetailInfoService.updateValidStatus(msg.getId()); logger.info("物资详细数据id:"+msg.getId()+" 物资过期!"); } } } }catch (Exception e){ logger.error("【安全物资和设备管理】消息消费异常!"+ "MSG_ID: "+ message.getMsgId()+e); } } }