🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# **1. 分析数据丢失的原因** ![](https://img.kancloud.cn/99/34/9934d9a1a9db8098cdbb72b079ee4a9e_681x180.png) 可以看出,一条消息整个过程要经历两次的网络传输:**从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者**。 **在消费者未消费前存储在队列(Queue)中**。 所以可以知道,有三个场景下是会发生消息丢失的: * 存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。 * 生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。 * 消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。 针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。 ![](https://img.kancloud.cn/00/dd/00ddeecbc84b2b44de389e59ef40efc9_685x204.png) # 2. 防丢失策略 ## 2.1 持久化 为了防止rabbitmq故障,导致数据丢失,详见消息持久化 ## 2.2 生产者 **在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据**。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解: ![](https://img.kancloud.cn/95/f5/95f58764711980a333f7a66795bf7e50_658x303.png) 从上图中可以看到是通过两个回调函数**confirm()、returnedMessage()** 进行通知。 一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数**confirm()**。第二步从Exchange路由分配到Queue中,对应回调函数则是**returnedMessage()**。 ### 2.2.1 配置生产者消费回调 ~~~text spring: rabbitmq: publisher-confirms: true # publisher-returns: true template: mandatory: true # publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者 # publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息 # spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。 ~~~ ### 2.2.2 回调函数 ~~~text @Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 监听消息是否到达Exchange * * @param correlationData 包含消息的唯一标识的对象 * @param ack true 标识 ack,false 标识 nack * @param cause nack 投递失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投递成功~消息Id:{}", correlationData.getId()); } else { logger.error("消息投递失败,Id:{},错误提示:{}", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息没有路由到队列,获得返回的消息"); Map map = byteToObject(message.getBody(), Map.class); logger.info("message body: {}", map == null ? "" : map.toString()); logger.info("replyCode: {}", replyCode); logger.info("replyText: {}", replyText); logger.info("exchange: {}", exchange); logger.info("routingKey: {}", exchange); logger.info("------------> end <------------"); } @SuppressWarnings("unchecked") private <T> T byteToObject(byte[] bytes, Class<T> clazz) { T t; try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bis)) { t = (T) ois.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } return t; } } ~~~ ### 2.2.3 生产者实现 ~~~text @Service public class RabbitMQServiceImpl implements RabbitMQService { @Resource private RabbitmqConfirmCallback rabbitmqConfirmCallback; @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback); //指定 ReturnCallback rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback); } @Override public String sendMsg(String msg) throws Exception { Map<String, Object> message = getMessage(msg); try { CorrelationData correlationData = (CorrelationData) message.remove("correlationData"); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } private Map<String, Object> getMessage(String msg) { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); CorrelationData correlationData = new CorrelationData(msgId); String sendTime = sdf.format(new Date()); Map<String, Object> map = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); map.put("correlationData", correlationData); return map; } } ~~~ 大功告成!接下来我们进行测试,发送一条消息,我们可以控制台: ![](https://img.kancloud.cn/6b/d4/6bd440fe1b0ac605831950c5115988e2_1250x95.png) **假设发送一条信息没有路由匹配到队列,可以看到如下信息:** ![](https://img.kancloud.cn/81/6f/816fa29a2db6cf68a1702abe994157ac_720x73.png) ## 2.3 消费端 消费者需要回复ack