From 5bd5f3bcd6d2cb375feb0756505691b551339716 Mon Sep 17 00:00:00 2001 From: zhangfeng <1603559716@qq.com> Date: 星期五, 23 十二月 2022 08:55:23 +0800 Subject: [PATCH] Merge branch 'master' of https://sinanoaq.cn:8888/r/safePlatform-out into zf --- equipment/equipment-service/src/main/java/com/gkhy/safePlatform/equipment/mq/consumer/SafeMaterialConsumer.java | 72 ++++++++++++++++++++++++++++++++++++ 1 files changed, 72 insertions(+), 0 deletions(-) diff --git a/equipment/equipment-service/src/main/java/com/gkhy/safePlatform/equipment/mq/consumer/SafeMaterialConsumer.java b/equipment/equipment-service/src/main/java/com/gkhy/safePlatform/equipment/mq/consumer/SafeMaterialConsumer.java new file mode 100644 index 0000000..e70357a --- /dev/null +++ b/equipment/equipment-service/src/main/java/com/gkhy/safePlatform/equipment/mq/consumer/SafeMaterialConsumer.java @@ -0,0 +1,72 @@ +package com.gkhy.safePlatform.equipment.mq.consumer; + +import com.gkhy.safePlatform.commons.utils.CalcuteDelayLevelUtils; +import com.gkhy.safePlatform.commons.utils.JsonUtils; + +import com.gkhy.safePlatform.equipment.entity.SafeMaterialDetailInfo; +import com.gkhy.safePlatform.equipment.enums.ValidStatusEnum; +import com.gkhy.safePlatform.equipment.mq.msg.SafeMaterialMsg; +import com.gkhy.safePlatform.equipment.service.baseService.SafeMaterialDetailInfoService; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.nio.charset.StandardCharsets; +import java.util.Date; + +@Service +@RocketMQMessageListener(topic = "${rocketmq.topic.safeMaterialTopic}",consumerGroup = "${rocketmq.consumer.safeMaterialGroup}") +public class SafeMaterialConsumer implements RocketMQListener<MessageExt> { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Value("${rocketmq.topic.safeMaterialTopic}") + private String safeMaterialTopic; + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @Autowired + private SafeMaterialDetailInfoService safeMaterialDetailInfoService; + + + @Override + @Transactional + public void onMessage(MessageExt message) { + try{ + SafeMaterialMsg msg = JsonUtils.parse(new String(message.getBody(), StandardCharsets.UTF_8),SafeMaterialMsg.class); + if(null != msg && msg.getId()>0){ + //当前时间早于目标处理时间前0秒 + Long relaySeconds = (msg.getValidTime().getTime() - new Date().getTime())/1000; + if(relaySeconds>0){ + //重发消息,实现连续延时 + Message reMsg = MessageBuilder.withPayload(msg).build(); + SendResult sendResult = rocketMQTemplate.syncSend(safeMaterialTopic, reMsg, 10000, CalcuteDelayLevelUtils.calcuteDelayLevel(relaySeconds)); + if(sendResult.getSendStatus() != SendStatus.SEND_OK){ + throw new RuntimeException("【安全物资管理】重发消息失败" + msg.getId()+ "MSG_ID: "+ message.getMsgId()); + } + }else{//超时 + SafeMaterialDetailInfo materialDetailInfo = safeMaterialDetailInfoService.queryById(msg.getId()); + if(null != materialDetailInfo){ + safeMaterialDetailInfoService.updateValidStatus(msg.getId()); + logger.info("物资详细数据id:"+msg.getId()+" 物资过期!"); + + } + } + + } + }catch (Exception e){ + logger.error("【安全物资和设备管理】消息消费异常!"+ "MSG_ID: "+ message.getMsgId()+e); + } + } +} -- Gitblit v1.9.2