Java面试之消息队列如何保证消息不丢失

消息不丢失需生产者、Broker、消费者三端协同保障:开启confirm机制并配异步监听、队列与消息均持久化、消费者手动ACK,金融级系统还需本地消息表+定时对账。

消息不丢失不是靠单个配置或一次确认就能解决的,而是生产者、Broker、消费者三端协同保障的结果。漏掉任意一环,都可能在高并发或异常场景下丢消息。

开启 RabbitMQ 生产者确认机制:channel.confirmSelect()

这是防止消息“发出去但没到交换机”的第一道防线。不启用它,basicPublish 调用成功只代表 TCP 写入成功,不代表 Broker 已接收。

  • 必须在 channel 创建后、声明队列前调用 channel.confirmSelect()
  • 推荐搭配异步确认(addConfirmListener),而非阻塞式 waitForConfirms(),否则吞吐暴跌
  • 注意:仅开启 confirm 不够,还要配合 publisher-return 监听路由失败(比如 routingKey 错误导致消息被丢弃)
  • 常见错误:只加了 confirmSelect,却没注册监听器,失败时完全无感知

消息与队列都得持久化:durable=trueMessageProperties.PERSISTENT_TEXT_PLAIN

持久化是防 Broker 重启丢消息的关键。但很多人只设了队列持久化,忘了消息本身也要标记为持久化。

  • 声明队列时传 truechannel.queueDeclare("q1", true, false, false, null)
  • 发送消息时必须用持久化属性:channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, body)
  • 交换机也建议声明为持久化(channel.exchangeDeclare("ex", "direct", true)),否则重启后绑定关系丢失,新消息路由失败
  • 坑点:Kafka 中对应的是 acks=all + min.insync.

    replicas=2
    ,不是简单设个 retention.ms

消费者必须关闭自动 ACK,改用手动 channel.basicAck()

这是最容易被跳过的环节。开 autoAck=true 意味着只要消息一推过去,RabbitMQ 就认为消费成功,哪怕你的业务逻辑还没跑完就崩了。

  • 创建消费者时指定 autoAck=falsechannel.basicConsume("q1", false, consumer)
  • 在业务处理完成、数据库事务提交之后,再调用 channel.basicAck(deliveryTag, false)
  • 如果处理失败,应明确拒绝:channel.basicNack(deliveryTag, false, true)(重入队)或转发至死信队列
  • 别在 try-catch 外层直接 ack——异常吞掉、没日志、没重试,等于静默丢消息

兜底方案:本地消息表 + 定时对账

上面三步能覆盖 95% 场景,但遇到网络分区、磁盘损坏、回调丢失等极端情况,仍可能出问题。金融/订单类系统建议加一层业务级保障。

  • 发消息前,先插入一条记录到 msg_log 表,字段至少含:msg_id(UUID)、contentexchangerouting_keystatus('sending')、next_retry_time
  • 发送成功后更新状态为 'sent';消费方处理完,通过回调或反向 MQ 通知发送方,将其改为 'confirmed'
  • 定时任务每 30 秒扫描 status = 'sent' AND next_retry_time 的记录,重新投递并更新重试时间
  • 注意:表主键必须是 msg_id,且所有写操作走同一 DB 实例,避免分布式事务复杂度

真正难的不是写几行确认代码,而是在每个环节都预设“它会失败”——网络会断、磁盘会坏、回调会丢、人会配错。所谓可靠性,就是把所有“应该发生”的事,都变成“必须验证发生”。