package com.gkhy.safePlatform.specialWork.mq.service.impl;
|
|
import cn.hutool.core.util.IdUtil;
|
import com.gkhy.safePlatform.commons.enums.ResultCodes;
|
import com.gkhy.safePlatform.commons.exception.BusinessException;
|
import com.gkhy.safePlatform.commons.utils.JsonUtils;
|
import com.gkhy.safePlatform.specialWork.mq.msg.AnalysisExpireMsg;
|
import com.gkhy.safePlatform.specialWork.mq.service.WorkAnalysisProducerService;
|
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendStatus;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.messaging.Message;
|
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.stereotype.Service;
|
|
@Service("workAnalysisProducerService")
|
public class WorkAnalysisProducerServiceImpl implements WorkAnalysisProducerService {
|
|
|
private static final String analysisExpireTopic = "analysisExpireTopic";
|
|
private static Logger log = LoggerFactory.getLogger(WorkAnalysisProducerService.class);
|
|
@Autowired
|
private RocketMQTemplate rocketMQTemplate;
|
|
|
@Override
|
public SendResult syncSendWorkExpireMsg(AnalysisExpireMsg message) {
|
// 发送消息生成唯一标识
|
message.setMessageKey("MUKey_" + IdUtil.getSnowflake(0, 0).nextIdStr());
|
// 调用 template 发送同步消息
|
SendResult sendResult = rocketMQTemplate.syncSend(analysisExpireTopic, MessageBuilder.withPayload(message).build());
|
// 没发送成功 抛出异常
|
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
|
return sendResult;
|
}else{
|
throw new BusinessException("S0001", "发送超时任务失败");
|
}
|
|
}
|
|
@Override
|
public SendResult syncSendWorkExpireMsgDelay(AnalysisExpireMsg message, int delayLevel) {
|
// 发送消息生成唯一标识
|
message.setMessageKey("MUKey_" + IdUtil.getSnowflake(0, 0).nextIdStr());
|
// 调用 template 发送同步消息
|
SendResult sendResult = rocketMQTemplate.syncSend(analysisExpireTopic, MessageBuilder.withPayload(message).build(), 3000, delayLevel);
|
|
// 没发送成功 抛出异常
|
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
|
return sendResult;
|
}else{
|
throw new BusinessException("S0001", "发送超时任务失败");
|
}
|
}
|
}
|