package com.gkhy.safePlatform.specialWork.mq.service.impl; import cn.hutool.core.util.IdUtil; import com.gkhy.safePlatform.commons.enums.ResultCodes; import com.gkhy.safePlatform.commons.exception.BusinessException; import com.gkhy.safePlatform.commons.utils.JsonUtils; import com.gkhy.safePlatform.specialWork.mq.msg.AnalysisExpireMsg; import com.gkhy.safePlatform.specialWork.mq.service.WorkAnalysisProducerService; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; @Service("workAnalysisProducerService") public class WorkAnalysisProducerServiceImpl implements WorkAnalysisProducerService { private static final String analysisExpireTopic = "analysisExpireTopic"; private static Logger log = LoggerFactory.getLogger(WorkAnalysisProducerService.class); @Autowired private RocketMQTemplate rocketMQTemplate; @Override public SendResult syncSendWorkExpireMsg(AnalysisExpireMsg message) { // 发送消息生成唯一标识 message.setMessageKey("MUKey_" + IdUtil.getSnowflake(0, 0).nextIdStr()); // 调用 template 发送同步消息 SendResult sendResult = rocketMQTemplate.syncSend(analysisExpireTopic, MessageBuilder.withPayload(message).build()); // 没发送成功 抛出异常 if (sendResult.getSendStatus() == SendStatus.SEND_OK) { return sendResult; }else{ throw new BusinessException("S0001", "发送超时任务失败"); } } @Override public SendResult syncSendWorkExpireMsgDelay(AnalysisExpireMsg message, int delayLevel) { // 发送消息生成唯一标识 message.setMessageKey("MUKey_" + IdUtil.getSnowflake(0, 0).nextIdStr()); // 调用 template 发送同步消息 SendResult sendResult = rocketMQTemplate.syncSend(analysisExpireTopic, MessageBuilder.withPayload(message).build(), 3000, delayLevel); // 没发送成功 抛出异常 if (sendResult.getSendStatus() == SendStatus.SEND_OK) { return sendResult; }else{ throw new BusinessException("S0001", "发送超时任务失败"); } } }