在分布式系统架构中,RocketMQ 凭借高吞吐、低延迟、高可用的特性成为主流消息队列,但消息丢失始终是开发者绕不开的核心痛点。一旦消息丢失,可能引发订单状态异常、数据不一致等严重业务问题。本文将从生产者、Broker、消费者三个核心阶段拆解消息丢失的根因,结合底层逻辑和实战代码,给出可落地的根治方案。
生产者发送消息后,若 Broker 未正确接收 / 响应,或生产者未处理发送结果,都会导致消息丢失。常见场景:
Oneway 发送:无任何确认机制,消息发送后无法确认 Broker 是否接收;同步发送网络抖动:Broker 已接收消息但响应丢失,生产者误以为发送失败;异步发送回调未处理:异常回调未记录,失败消息无法重试;无本地兜底:发送失败后无持久化,重启后消息彻底丢失。
<dependencies>
<!-- Spring Boot核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
</dependency>
<!-- RocketMQ整合 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.3</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- MyBatisPlus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.43</version>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
<version>2.2.18</version>
</dependency>
</dependencies>
@Configuration
@Slf4j
public class RocketMQProducerConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// NameServer地址
producer.setNamesrvAddr(nameServer);
// 同步发送重试次数(默认2次,建议3次)
producer.setRetryTimesWhenSendFailed(3);
// 异步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
// 启动生产者
producer.start();
log.info("RocketMQ生产者启动成功,group:{}", producerGroup);
return producer;
}
}
CREATE TABLE `local_message` (
`message_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`biz_id` varchar(64) NOT NULL COMMENT '业务唯一标识(幂等键)',
`topic` varchar(64) NOT NULL COMMENT 'RocketMQ主题',
`tags` varchar(32) DEFAULT NULL COMMENT '标签',
`message_content` text NOT NULL COMMENT '消息内容',
`status` varchar(16) NOT NULL COMMENT '状态:PENDING/FAILED/SUCCESS/RETRYING',
`msg_id` varchar(64) DEFAULT NULL COMMENT 'RocketMQ返回的消息ID',
`send_times` int NOT NULL DEFAULT '0' COMMENT '发送次数',
`error_msg` varchar(255) DEFAULT NULL COMMENT '错误信息',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`message_id`),
UNIQUE KEY `uk_biz_id` (`biz_id`),
KEY `idx_status_send_times` (`status`,`send_times`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表(生产端兜底)';
@Service
@Slf4j
public class RocketMQProducerService {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private LocalMessageMapper localMessageMapper;
/**
* 同步发送消息(带本地事务表兜底)
* @param topic 主题
* @param tags 标签
* @param messageContent 消息内容
* @param bizId 业务唯一ID(幂等键)
* @return 发送结果
* @throws Exception 发送异常
* @author ken
*/
@Transactional(rollbackFor = Exception.class)
public SendResult sendSyncMessage(
@NotBlank(message = "主题不能为空") String topic,
String tags,
@NotBlank(message = "消息内容不能为空") String messageContent,
@NotBlank(message = "业务ID不能为空") String bizId) throws Exception {
// 1. 先落库本地消息表,确保消息可追溯
LocalMessageDO localMessage = LocalMessageDO.builder()
.messageId(UUID.randomUUID().toString())
.bizId(bizId)
.topic(topic)
.tags(tags)
.messageContent(messageContent)
.status(MessageStatus.PENDING.name())
.sendTimes(0)
.createTime(new Date())
.updateTime(new Date())
.build();
localMessageMapper.insert(localMessage);
// 2. 构建RocketMQ消息(设置业务ID到属性,供消费端幂等)
Message rocketMsg = new Message(topic, tags, messageContent.getBytes(StandardCharsets.UTF_8));
rocketMsg.putUserProperty("bizId", bizId);
// 3. 同步发送(自动重试)
SendResult sendResult;
try {
sendResult = defaultMQProducer.send(rocketMsg);
// 4. 根据发送结果更新本地表
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
localMessage.setStatus(MessageStatus.SUCCESS.name());
localMessage.setMsgId(sendResult.getMsgId());
log.info("同步消息发送成功,bizId:{}, msgId:{}", bizId, sendResult.getMsgId());
} else {
localMessage.setStatus(MessageStatus.FAILED.name());
localMessage.setErrorMsg(sendResult.getSendStatus().name());
log.error("同步消息发送失败,bizId:{}, 状态:{}", bizId, sendResult.getSendStatus());
throw new RuntimeException("消息发送失败:" + sendResult.getSendStatus());
}
} catch (Exception e) {
localMessage.setStatus(MessageStatus.FAILED.name());
localMessage.setErrorMsg(e.getMessage());
log.error("同步消息发送异常,bizId:{}", bizId, e);
throw e;
} finally {
localMessage.setUpdateTime(new Date());
localMessageMapper.updateById(localMessage);
}
return sendResult;
}
/**
* 异步发送消息(带回调处理)
* @author ken
*/
@Transactional(rollbackFor = Exception.class)
public void sendAsyncMessage(String topic, String tags, String messageContent, String bizId) {
// 1. 本地表落库
LocalMessageDO localMessage = LocalMessageDO.builder()
.messageId(UUID.randomUUID().toString())
.bizId(bizId)
.topic(topic)
.tags(tags)
.messageContent(messageContent)
.status(MessageStatus.PENDING.name())
.sendTimes(0)
.createTime(new Date())
.updateTime(new Date())
.build();
localMessageMapper.insert(localMessage);
// 2. 构建消息
Message rocketMsg = new Message(topic, tags, messageContent.getBytes(StandardCharsets.UTF_8));
rocketMsg.putUserProperty("bizId", bizId);
// 3. 异步发送+回调处理
try {
defaultMQProducer.send(rocketMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
localMessage.setStatus(MessageStatus.SUCCESS.name());
localMessage.setMsgId(sendResult.getMsgId());
log.info("异步消息发送成功,bizId:{}, msgId:{}", bizId, sendResult.getMsgId());
} else {
localMessage.setStatus(MessageStatus.FAILED.name());
localMessage.setErrorMsg(sendResult.getSendStatus().name());
log.error("异步消息发送失败,bizId:{}, 状态:{}", bizId, sendResult.getSendStatus());
}
localMessage.setUpdateTime(new Date());
localMessageMapper.updateById(localMessage);
}
@Override
public void onException(Throwable e) {
localMessage.setStatus(MessageStatus.FAILED.name());
localMessage.setErrorMsg(e.getMessage());
localMessage.setUpdateTime(new Date());
localMessageMapper.updateById(localMessage);
log.error("异步消息发送异常,bizId:{}", bizId, e);
}
});
} catch (Exception e) {
localMessage.setStatus(MessageStatus.FAILED.name());
localMessage.setErrorMsg(e.getMessage());
localMessage.setUpdateTime(new Date());
localMessageMapper.updateById(localMessage);
log.error("异步消息初始化异常,bizId:{}", bizId, e);
}
}
}
@Component
@Slf4j
public class MessageRetryTask {
@Resource
private LocalMessageMapper localMessageMapper;
@Resource
private DefaultMQProducer defaultMQProducer;
@Value("${rocketmq.producer.max-retry-times:3}")
private Integer maxRetryTimes;
/**
* 每5分钟重试失败消息
* @author ken
*/
@Scheduled(cron = "0 */5 * * * ?")
public void retryFailedMessages() {
log.info("开始重试失败消息,最大重试次数:{}", maxRetryTimes);
// 查询待重试消息:状态为PENDING/FAILED且发送次数<最大次数
List<LocalMessageDO> retryList = localMessageMapper.listRetryMessages(maxRetryTimes);
if (CollectionUtils.isEmpty(retryList)) {
log.info("暂无待重试消息");
return;
}
for (LocalMessageDO msg : retryList) {
try {
// 更新发送状态和次数
msg.setSendTimes(msg.getSendTimes() + 1);
msg.setStatus(MessageStatus.RETRYING.name());
msg.setUpdateTime(new Date());
localMessageMapper.updateById(msg);
// 重新发送消息
Message rocketMsg = new Message(msg.getTopic(), msg.getTags(), msg.getMessageContent().getBytes(StandardCharsets.UTF_8));
rocketMsg.putUserProperty("bizId", msg.getBizId());
SendResult sendResult = defaultMQProducer.send(rocketMsg);
// 更新结果
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
msg.setStatus(MessageStatus.SUCCESS.name());
msg.setMsgId(sendResult.getMsgId());
msg.setErrorMsg(null);
log.info("消息重试成功,bizId:{}, msgId:{}", msg.getBizId(), sendResult.getMsgId());
} else {
msg.setStatus(MessageStatus.FAILED.name());
msg.setErrorMsg(sendResult.getSendStatus().name());
log.error("消息重试失败,bizId:{}, 状态:{}", msg.getBizId(), sendResult.getSendStatus());
}
} catch (Exception e) {
msg.setStatus(MessageStatus.FAILED.name());
msg.setErrorMsg(e.getMessage());
log.error("消息重试异常,bizId:{}", msg.getBizId(), e);
} finally {
msg.setUpdateTime(new Date());
localMessageMapper.updateById(msg);
}
}
log.info("重试任务完成,处理条数:{}", retryList.size());
}
}

Broker 接收到消息后,若未持久化到磁盘或未同步到从节点就宕机,消息会丢失。核心场景:
异步刷盘:消息写入页缓存后 Broker 宕机,未刷盘到磁盘;异步复制:主节点接收消息后未同步到从节点,主节点宕机;磁盘满 / 损坏:无法写入 commitLog,消息丢失。
# 节点角色:MASTER/SLAVE
brokerRole=MASTER
# 刷盘策略:SYNC_FLUSH(同步刷盘)/ASYNC_FLUSH(异步刷盘)
flushDiskType=SYNC_FLUSH
# 主从复制策略:SYNC_MASTER(同步)/ASYNC_MASTER(异步)
replicationMode=SYNC_MASTER
# 从节点地址(主节点配置)
slaveReadEnable=true
# commitLog存储路径
storePathCommitLog=/data/rocketmq/commitlog
# consumeQueue存储路径
storePathConsumeQueue=/data/rocketmq/consumequeue
# 磁盘使用率阈值(超过则拒绝写入)
diskMaxUsedSpaceRatio=85
# 消息持久化超时时间
flushCommitLogTimed=500
# 主从同步超时时间
syncTimeout=5000

@Component
@Slf4j
public class BrokerDiskMonitor {
@Value("${rocketmq.broker.store-path}")
private String storePath;
/**
* 每分钟检查磁盘使用率
* @author ken
*/
@Scheduled(cron = "0 */1 * * * ?")
public void checkDiskUsage() {
File storeDir = new File(storePath);
if (!storeDir.exists()) {
log.error("Broker存储目录不存在:{}", storePath);
return;
}
// 获取磁盘信息
FileStore store = FileStoreUtils.getFileStore(storeDir);
long totalSpace = store.getTotalSpace();
long freeSpace = store.getUsableSpace();
double usedRatio = 1 - (double) freeSpace / totalSpace;
log.info("Broker磁盘监控:总空间{}GB,剩余{}GB,使用率{:.2%}",
totalSpace / 1024 / 1024 / 1024,
freeSpace / 1024 / 1024 / 1024,
usedRatio);
// 超过阈值告警
if (usedRatio > 0.85) {
log.error("磁盘使用率超过阈值(85%),请及时清理!");
// 这里可接入告警系统(如钉钉/邮件)
}
}
}
消费者接收到消息后,若未处理完成就提交 offset,或处理失败后未重试,会导致消息丢失。核心场景:
自动提交 offset:消息刚接收就提交 offset,处理失败后无法重新消费;无幂等处理:重复消费导致业务异常,开发者不敢重试;重试次数不足:消费失败后直接丢弃;无死信队列:无法处理的消息无兜底。
@Configuration
@Slf4j
public class RocketMQConsumerConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
// 关闭自动提交offset
consumer.setAutoCommit(false);
// 集群模式(避免重复消费)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 消费线程数
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
// 最大重试次数(超过则进入死信)
consumer.setMaxReconsumeTimes(3);
// 订阅主题
consumer.subscribe("order_topic", "*");
// 注册监听器
consumer.registerMessageListener(new OrderMessageConsumer());
consumer.start();
log.info("RocketMQ消费者启动成功,group:{}", consumerGroup);
return consumer;
}
}
@Slf4j
public class OrderMessageConsumer implements MessageListenerConcurrently {
@Resource
private OrderService orderService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
String msgId = msg.getMsgId();
String bizId = msg.getUserProperty("bizId");
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("接收消息,msgId:{}, bizId:{}, 重试次数:{}", msgId, bizId, msg.getReconsumeTimes());
try {
// 1. 幂等检查:业务ID是否已处理
if (orderService.checkBizProcessed(bizId)) {
log.info("消息已处理(幂等),bizId:{}", bizId);
// 手动提交offset
commitOffset(context, msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 2. 处理业务逻辑
orderService.processOrder(content, bizId);
// 3. 记录幂等标识
orderService.recordBizProcessed(bizId);
// 4. 手动提交offset(确保处理完成)
commitOffset(context, msg);
log.info("消息处理成功,bizId:{}", bizId);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消息处理失败,msgId:{}, bizId:{}", msgId, bizId, e);
// 重试次数用尽则进入死信
if (msg.getReconsumeTimes() >= 3) {
log.error("消息重试次数用尽,进入死信队列,bizId:{}", bizId);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
/**
* 手动提交offset
* @author ken
*/
private void commitOffset(ConsumeConcurrentlyContext context, MessageExt msg) {
try {
// 获取当前队列
MessageQueue queue = msg.getMessageQueue();
long offset = msg.getQueueOffset() + 1;
// 更新并持久化offset
context.getMessageQueueListener().updateOffset(queue, offset);
log.info("手动提交offset成功,queue:{}, offset:{}", queue, offset);
} catch (Exception e) {
log.error("手动提交offset失败,msgId:{}", msg.getMsgId(), e);
}
}
}
@Service
@Slf4j
public class OrderService {
@Resource
private OrderMapper orderMapper;
/**
* 检查业务ID是否已处理
* @author ken
*/
public boolean checkBizProcessed(String bizId) {
OrderDO order = orderMapper.selectOne(new LambdaQueryWrapper<OrderDO>().eq(OrderDO::getBizId, bizId));
return !ObjectUtils.isEmpty(order);
}
/**
* 记录业务ID处理状态
* @author ken
*/
public void recordBizProcessed(String bizId) {
// 实际场景可更新订单状态或插入幂等表
log.info("记录业务ID处理完成,bizId:{}", bizId);
}
/**
* 处理订单业务(核心逻辑)
* @author ken
*/
@Transactional(rollbackFor = Exception.class)
public void processOrder(String content, String bizId) {
// 解析消息
OrderDTO orderDTO = JSON.parseObject(content, OrderDTO.class);
if (ObjectUtils.isEmpty(orderDTO)) {
throw new IllegalArgumentException("订单信息为空");
}
// 插入订单
OrderDO order = OrderDO.builder()
.orderId(UUID.randomUUID().toString())
.bizId(bizId)
.userId(orderDTO.getUserId())
.productId(orderDTO.getProductId())
.amount(orderDTO.getAmount())
.status(OrderStatus.PENDING.name())
.createTime(new Date())
.updateTime(new Date())
.build();
orderMapper.insert(order);
log.info("订单创建成功,orderId:{}, bizId:{}", order.getOrderId(), bizId);
}
}
@Service
@Slf4j
@RocketMQMessageListener(
topic = "DLQ_order_topic", // 死信队列主题(格式:DLQ_消费者组@原主题)
consumerGroup = "dlq_consumer_group",
consumeMode = ConsumeMode.CONCURRENTLY
)
public class DeadLetterConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
String msgId = msg.getMsgId();
String bizId = msg.getUserProperty("bizId");
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
log.error("死信队列消息:msgId:{}, bizId:{}, 内容:{}, 重试次数:{}",
msgId, bizId, content, msg.getReconsumeTimes());
// 记录到死信表,通知人工处理
saveDeadLetterMessage(msgId, bizId, content, msg.getReconsumeTimes(), msg.getErrorMessage());
}
/**
* 保存死信消息到数据库
* @author ken
*/
private void saveDeadLetterMessage(String msgId, String bizId, String content, Integer retryTimes, String errorMsg) {
// 实际场景可插入死信表
log.info("保存死信消息:msgId:{}, bizId:{}", msgId, bizId);
}
}

RocketMQ 消息丢失的本质是 “未确认的状态变更”—— 无论是生产者未确认 Broker 接收、Broker 未确认持久化 / 同步、消费者未确认处理完成,都会导致消息丢失。要实现 “零丢失”,需遵循三大原则:
生产端:本地兜底 + 重试通过本地事务表确保消息可追溯,失败后定时重试,实现 “至少发送一次”。
Broker 端:持久化 + 主从同步刷盘确保消息落地,同步复制确保主从数据一致,避免单点故障丢失。
消费端:手动提交 + 幂等处理完成后再提交 offset,幂等设计允许重复消费,重试 + 死信兜底异常消息。
结合以上方案,可构建从生产到消费的全链路可靠消息系统,彻底解决 RocketMQ 消息丢失问题。
¥50.00
E英语视听说教程2智慧版 含2个激活码 赵英俊 焦绘宏 外研社 9787521327281
¥28.80
现代大学英语精读2 第三版 学生用书 同步测试 全2册 封底含数字课程激活码 9787521331899 杨立民 徐克容 外研社 现代大学英语
¥143.00
正版新书 新视野大学英语阅读教程1智慧版+阅读教程2智慧版+阅读教程3智慧版+阅读教程4智慧版 激活码 外研社
¥19.90
2022年小黑课堂计算机一级ms office基础及应用上机题库激活码小黑老师通关秘籍无纸化教程真题全国计算机等级考试教材1级msoffice
¥34.80
未来教育 备考2023年3月 全国计算机等级考试一级office教程全套 大学模拟模考软件1级真题无纸化教材激活码office上机题库2022
¥25.00
现货 杨立民 现代大学英语精读1(含数字课程激活码)第3版精读1第三版学生用书 杨立民 徐克容 大学教材外语教学与研究出版社