随着业务数据量突破千万级,分库分表成为分布式系统架构设计的必然选择。但分库分表后,跨库事务一致性问题成为技术瓶颈 —— 传统本地事务(ACID)无法覆盖跨节点操作,网络波动、节点故障等场景易导致数据不一致,严重影响业务稳定性。
本文基于 Spring Boot 生态,结合 3 年分布式实战经验,详细拆解 5 种主流分布式事务方案的原理、代码实现、适用场景及性能对比,助力开发者快速选型落地。
分库分表后,数据被拆分至不同数据库 / 表,导致:
事务原子性失效:跨库操作无法通过单一begin/commit保证 “全成或全败”;网络不可靠性:跨节点通信存在延迟、丢包风险,易引发 “部分提交”;数据一致性冲突:如 “支付扣钱但订单未创建”“库存扣减但支付超时” 等场景。分布式事务的核心目标是在「一致性(强一致 / 最终一致)、性能、开发成本」三者间找到平衡,无需盲目追求强一致。
基于 “本地事务 + 可靠消息” 实现最终一致性,利用 MySQL 本地事务保证业务操作与消息写入的原子性,通过定时任务将消息投递至 MQ,下游消费后回调更新消息状态。
CREATE TABLE `local_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`message_id` varchar(64) NOT NULL COMMENT '消息唯一ID(UUID)',
`business_id` varchar(64) NOT NULL COMMENT '业务ID(订单号)',
`message_content` json NOT NULL COMMENT '消息内容(商品ID、库存数等)',
`status` tinyint NOT NULL COMMENT '状态:0-待发送,1-已发送,2-已完成,3-失败',
`retry_count` int NOT NULL DEFAULT 0 COMMENT '重试次数',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`),
KEY `idx_business_id` (`business_id`),
KEY `idx_status_retry` (`status`,`retry_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';
// 1. 下单+写入消息(本地事务)
@Service
@Transactional
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private LocalMessageMapper messageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(OrderDTO orderDTO) {
// ① 创建订单
Order order = new Order();
order.setOrderNo(orderDTO.getOrderNo());
order.setProductId(orderDTO.getProductId());
order.setStatus(1); // 待支付
orderMapper.insert(order);
// ② 写入本地消息
LocalMessage message = new LocalMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setBusinessId(orderDTO.getOrderNo());
message.setMessageContent(JSON.toJSONString(orderDTO));
message.setStatus(0); // 待发送
messageMapper.insert(message);
}
// 2. 定时任务发送消息(每5秒执行)
@Scheduled(cron = "0/5 * * * * ?")
public void sendMessage() {
// 查询待发送且重试次数≤3的消息
List<LocalMessage> messages = messageMapper.selectByStatusAndRetryCount(0, 3);
for (LocalMessage message : messages) {
try {
// 发送消息至RabbitMQ
rabbitTemplate.convertAndSend("order_exchange", "order.routing.key", message);
// 更新消息状态为“已发送”
message.setStatus(1);
message.setRetryCount(message.getRetryCount() + 1);
messageMapper.updateById(message);
} catch (Exception e) {
// 重试次数+1,失败后人工介入
message.setRetryCount(message.getRetryCount() + 1);
if (message.getRetryCount() >= 3) {
message.setStatus(3); // 标记失败
}
messageMapper.updateById(message);
}
}
}
// 3. 消息消费回调(更新状态为“已完成”)
@RabbitListener(queues = "stock_queue")
public void handleMessageAck(String messageId) {
LocalMessage message = messageMapper.selectByMessageId(messageId);
if (message != null) {
message.setStatus(2);
messageMapper.updateById(message);
}
}
}
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = "stock_queue")
public void deductStock(LocalMessage message) {
String businessId = message.getBusinessId();
String productId = JSON.parseObject(message.getMessageContent()).getString("productId");
// 幂等性校验:Redis分布式锁+业务唯一键
String lockKey = "stock:lock:" + businessId + ":" + productId;
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);
if (Boolean.TRUE.equals(lock)) {
try {
// 扣减库存
stockMapper.deductStock(productId, 1);
// 回调订单服务更新消息状态
rabbitTemplate.convertAndSend("ack_exchange", "ack.routing.key", message.getMessageId());
} finally {
redisTemplate.delete(lockKey);
}
}
}
}
基于 RocketMQ 原生事务消息特性,将消息发送拆分为 “半事务消息→本地事务执行→消息确认 / 回滚” 三步,由 MQ 保证消息可靠性,解耦本地消息表。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order_producer_group
send-message-timeout: 3000
@Service
public class OrderTransactionService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 1. 发送半事务消息
public void createOrderWithTransaction(OrderDTO orderDTO) {
// 构造半事务消息
Message<OrderDTO> message = MessageBuilder.withPayload(orderDTO)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送半事务消息,指定事务监听器
rocketMQTemplate.sendMessageInTransaction(
"order_transaction_topic", // Topic名称
message,
orderDTO // 额外参数(传递至事务监听器)
);
}
// 2. 事务监听器(执行本地事务+确认/回滚消息)
@RocketMQTransactionListener(txProducerGroup = "order_producer_group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
OrderDTO orderDTO = (OrderDTO) arg;
try {
// 执行本地事务:创建订单
Order order = new Order();
order.setOrderNo(orderDTO.getOrderNo());
order.setProductId(orderDTO.getProductId());
orderMapper.insert(order);
// 本地事务成功,返回COMMIT
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 本地事务失败,返回ROLLBACK
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 3. 消息回查(兜底机制)
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderNo = JSON.parseObject(msg.getBody()).getString("orderNo");
// 查询订单是否存在
Order order = orderMapper.selectByOrderNo(orderNo);
if (order != null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
// 4. 库存服务消费消息
@Service
public class StockConsumer {
@Autowired
private StockMapper stockMapper;
@RocketMQMessageListener(
topic = "order_transaction_topic",
consumerGroup = "stock_consumer_group"
)
public class StockListener implements RocketMQListener<OrderDTO> {
@Override
public void onMessage(OrderDTO orderDTO) {
// 扣减库存(幂等性处理同方案1)
stockMapper.deductStock(orderDTO.getProductId(), 1);
}
}
}
}
通过 “Try-Confirm-Cancel” 三步协议实现强一致:
Try:资源检查与预占(如冻结余额、锁定库存); Confirm:确认执行(正式扣减预占资源); Cancel:取消执行(回滚预占资源)。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
// 1. 定义TCC接口
public interface AccountTccService {
// Try:冻结转账金额
@TwoPhaseBusinessAction(name = "transferTry", commitMethod = "transferConfirm", rollbackMethod = "transferCancel")
void transferTry(
@BusinessActionContextParameter(paramName = "fromUserId") Long fromUserId,
@BusinessActionContextParameter(paramName = "toUserId") Long toUserId,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount,
BusinessActionContext context
);
// Confirm:确认转账
void transferConfirm(BusinessActionContext context);
// Cancel:取消转账
void transferCancel(BusinessActionContext context);
}
// 2. 实现TCC接口
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Autowired
private AccountMapper accountMapper;
@Override
@Transactional
public void transferTry(Long fromUserId, Long toUserId, BigDecimal amount, BusinessActionContext context) {
// 检查余额
Account fromAccount = accountMapper.selectById(fromUserId);
if (fromAccount.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("余额不足");
}
// 冻结金额(预占资源)
accountMapper.freezeBalance(fromUserId, amount);
}
@Override
@Transactional
public void transferConfirm(BusinessActionContext context) {
// 解析参数
Long fromUserId = Long.valueOf(context.getActionContext("fromUserId").toString());
Long toUserId = Long.valueOf(context.getActionContext("toUserId").toString());
BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());
// 扣减冻结金额
accountMapper.deductFrozenBalance(fromUserId, amount);
// 增加接收方余额
accountMapper.addBalance(toUserId, amount);
}
@Override
@Transactional
public void transferCancel(BusinessActionContext context) {
Long fromUserId = Long.valueOf(context.getActionContext("fromUserId").toString());
BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());
// 解冻金额(回滚预占资源)
accountMapper.unfreezeBalance(fromUserId, amount);
}
}
// 3. 调用TCC服务
@Service
public class TransferService {
@Autowired
private AccountTccService accountTccService;
public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
accountTccService.transferTry(fromUserId, toUserId, amount, new BusinessActionContext());
}
}
由协调者(Coordinator)和参与者(Participant)组成,分两阶段:
准备阶段:协调者通知所有参与者执行事务(不提交),反馈执行结果;提交阶段:所有参与者就绪则统一提交,否则全量回滚。将长事务拆分为多个短本地事务,通过 “正向事务 + 补偿事务” 实现最终一致,失败时按逆序执行补偿。
seata:
saga:
state-machine-definition: |
{
"name":"orderSaga",
"comment":"订单创建SAGA状态机",
"startState":"createOrder",
"states":[
{
"name":"createOrder",
"type":"ServiceTask",
"serviceName":"orderService",
"serviceMethod":"createOrder",
"nextState":"deductStock",
"compensateState":"cancelOrder"
},
{
"name":"deductStock",
"type":"ServiceTask",
"serviceName":"stockService",
"serviceMethod":"deductStock",
"nextState":"deductBalance",
"compensateState":"restoreStock"
},
{
"name":"deductBalance",
"type":"ServiceTask",
"serviceName":"accountService",
"serviceMethod":"deductBalance",
"endState":true,
"compensateState":"restoreBalance"
},
{
"name":"cancelOrder",
"type":"ServiceTask",
"serviceName":"orderService",
"serviceMethod":"cancelOrder",
"endState":true
},
{
"name":"restoreStock",
"type":"ServiceTask",
"serviceName":"stockService",
"serviceMethod":"restoreStock",
"nextState":"cancelOrder"
},
{
"name":"restoreBalance",
"type":"ServiceTask",
"serviceName":"accountService",
"serviceMethod":"restoreBalance",
"nextState":"restoreStock"
}
]
}
// 订单服务
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
public void createOrder(OrderDTO orderDTO) {
Order order = new Order();
order.setOrderNo(orderDTO.getOrderNo());
orderMapper.insert(order);
}
public void cancelOrder(OrderDTO orderDTO) {
orderMapper.updateStatus(orderDTO.getOrderNo(), 0); // 取消订单
}
}
// 库存服务
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
public void deductStock(OrderDTO orderDTO) {
stockMapper.deductStock(orderDTO.getProductId(), 1);
}
public void restoreStock(OrderDTO orderDTO) {
stockMapper.addStock(orderDTO.getProductId(), 1); // 补偿:恢复库存
}
}
// 账户服务
@Service
public class AccountService {
@Autowired
private AccountMapper accountMapper;
public void deductBalance(OrderDTO orderDTO) {
accountMapper.deductBalance(orderDTO.getUserId(), orderDTO.getAmount());
}
public void restoreBalance(OrderDTO orderDTO) {
accountMapper.addBalance(orderDTO.getUserId(), orderDTO.getAmount()); // 补偿:恢复余额
}
}
// 调用SAGA状态机
@Service
public class SagaOrderService {
@Autowired
private StateMachineEngine stateMachineEngine;
public void submitOrder(OrderDTO orderDTO) {
StateMachineInstance instance = stateMachineEngine.start("orderSaga", orderDTO.getOrderNo(), orderDTO);
if (!instance.isEnded()) {
throw new RuntimeException("SAGA事务执行失败");
}
}
}
|
方案 |
一致性级别 |
并发支持 |
开发成本 |
中间件依赖 |
性能 |
适用场景 |
|
本地消息表 |
最终一致 |
中低 |
低 |
无(可选 MQ) |
中 |
中小系统、非核心业务(物流、积分) |
|
事务消息 |
最终一致 |
高 |
中 |
RocketMQ(推荐) |
高 |
中大型系统、核心业务(秒杀、支付) |
|
TCC |
强一致 |
高 |
高 |
无(可选 Seata) |
高 |
金融、支付、转账等关键场景 |
|
2PC |
强一致 |
低 |
中 |
协调者 + 分布式数据库 |
低 |
传统低并发、短事务(银行对账) |
|
SAGA |
最终一致 |
高 |
中高 |
可选 Seata/Spring Cloud Stream |
中高 |
跨系统、长事务(供应链、审批) |
分布式事务无 “银弹”,选型的核心是匹配业务场景与团队能力。本文提供的 5 种方案均已在生产环境验证,可根据实际需求灵活选用。
后续将分享:
Seata TCC/SAGA 模式深度调优;事务消息与本地消息表性能对比测试;分布式事务监控与问题排查实战。欢迎在评论区交流你的落地经验或技术疑问,觉得有用的话点赞 + 收藏,关注我获取更多分布式架构干货!
#JAVA技术培训##JAVA开发##技术文档编写#