package com.gkhy.safePlatform.specialWork.common; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronizationAdapter; @Component public class RocketMQSpecialWorkTemplateHelper { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 事务提交后发送MQ * @param message * @param */ public void syncSend(String destination, T message) { // 是否开启事务判断 if (org.springframework.transaction.support.TransactionSynchronizationManager.isSynchronizationActive()) { org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { SendResult sendResult = rocketMQTemplate.syncSend(destination, message); if(sendResult.getSendStatus() != SendStatus.SEND_OK){ throw new RuntimeException("【特殊作业】发送MQ的消息失败"+message); } } }); } else { SendResult sendResult = rocketMQTemplate.syncSend(destination, message); if(sendResult.getSendStatus() != SendStatus.SEND_OK){ throw new RuntimeException("【特殊作业】发送MQ的消息失败"+message); } } } }