# 前言:事务管理的艺术与哲学
在我多年的开发经历中,事务管理一直是一个既熟悉又陌生的话题。熟悉的是,几乎每个项目都需要用到事务;陌生的是,真正理解事务本质的人却寥寥无几。我见过太多因为事务使用不当而导致的系统故障,也见证过优秀的事务设计如何让系统更加健壮。
记得有一次,我接手了一个电商系统,发现订单处理逻辑中存在严重的事务问题:用户下单时,库存扣减和订单创建不在同一个事务中,导致超卖现象频发。这个问题不仅造成了经济损失,更严重影响了用户体验。通过重新设计事务边界,我们彻底解决了这个问题。
Spring 的事务管理机制是 Java 企业级开发中最优雅的设计之一。它将复杂的事务处理逻辑封装起来,让开发者能够专注于业务逻辑。但正是这种封装,让很多开发者对事务的内部原理缺乏深入理解。
今天,我想和大家分享我对 Spring 事务管理的深度思考和实战经验,希望能帮助你真正掌握这门重要的技术。
# 事务基础理论:ACID 与隔离级别
# 什么是事务?
事务是数据库操作的基本单位,它是一个不可分割的工作序列。想象一下银行转账的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void transfer(Long fromId, Long toId, BigDecimal amount) { Account fromAccount = accountDao.findById(fromId); if (fromAccount.getBalance().compareTo(amount) < 0) { throw new InsufficientBalanceException("余额不足"); } fromAccount.setBalance(fromAccount.getBalance().subtract(amount)); accountDao.update(fromAccount); Account toAccount = accountDao.findById(toId); toAccount.setBalance(toAccount.getBalance().add(amount)); accountDao.update(toAccount); }
|
如果没有事务保护,当第 2 步执行成功但第 3 步失败时,钱就会凭空消失!事务确保这组操作要么全部成功,要么全部失败。
# ACID 四大特性
# 1. 原子性(Atomicity)
事务中的所有操作要么全部完成,要么全部不完成。
1 2 3 4 5 6
| @Transactional public void transfer(Long fromId, Long toId, BigDecimal amount) { deductBalance(fromId, amount); addBalance(toId, amount); }
|
# 2. 一致性(Consistency)
事务执行前后,数据库都处于一致的状态。
1 2 3 4 5 6 7 8 9 10 11
| @Transactional public void placeOrder(Order order) { BigDecimal totalAmount = order.getUnitPrice().multiply(BigDecimal.valueOf(order.getQuantity())); if (!order.getTotalAmount().equals(totalAmount)) { throw new BusinessException("订单金额不一致"); } orderDao.save(order); inventoryDao.reduceStock(order.getProductId(), order.getQuantity()); }
|
# 3. 隔离性(Isolation)
并发事务之间相互隔离,不会相互干扰。
# 4. 持久性(Durability)
事务一旦提交,对数据库的修改就是永久性的。
# 事务隔离级别
不同的隔离级别在并发性能和数据一致性之间做权衡:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Service public class TransactionIsolationDemo {
@Transactional(isolation = Isolation.READ_UNCOMMITTED) public void readUncommitted() { Product product = productDao.findById(1L); System.out.println("读取到未提交的数据:" + product.getPrice()); }
@Transactional(isolation = Isolation.READ_COMMITTED) public void readCommitted() { Product product = productDao.findById(1L); System.out.println("读取到已提交的数据:" + product.getPrice()); }
@Transactional(isolation = Isolation.REPEATABLE_READ) public void repeatableRead() { Product product1 = productDao.findById(1L); try { Thread.sleep(3000); } catch (InterruptedException e) {} Product product2 = productDao.findById(1L); System.out.println("两次读取结果相同:" + product1.getPrice().equals(product2.getPrice())); }
@Transactional(isolation = Isolation.SERIALIZABLE) public void serializable() { List<Product> products = productDao.findAll(); System.out.println("串行化读取:" + products.size()); } }
|
# Spring 事务管理核心机制
Spring 事务管理的核心是 PlatformTransactionManager 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public interface PlatformTransactionManager {
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException; }
|
Spring 提供了多种事务管理器实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
@Configuration public class TransactionConfig {
@Bean public PlatformTransactionManager jdbcTransactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); }
@Bean public PlatformTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory) { return new JpaTransactionManager(entityManagerFactory); }
@Bean public PlatformTransactionManager mybatisTransactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); }
@Bean public PlatformTransactionManager jtaTransactionManager() { return new JtaTransactionManager(); } }
|
# 事务定义 TransactionDefinition
TransactionDefinition 定义了事务的各种属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public class TransactionDefinitionDemo {
public void programmaticTransaction() { DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); definition.setTimeout(30); definition.setReadOnly(true); definition.setName("programmaticTransaction"); } }
|
# 事务传播行为
事务传播行为定义了事务方法之间的调用关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
@Service public class TransactionPropagationDemo {
@Transactional(propagation = Propagation.REQUIRED) public void requiredMethod() { innerMethod(); } @Transactional(propagation = Propagation.REQUIRED) public void innerMethod() { }
@Transactional(propagation = Propagation.REQUIRES_NEW) public void requiresNewMethod() { logOperation(); } @Transactional(propagation = Propagation.REQUIRES_NEW) public void logOperation() { }
@Transactional(propagation = Propagation.SUPPORTS) public void supportsMethod() { }
@Transactional(propagation = Propagation.NOT_SUPPORTED) public void notSupportedMethod() { }
@Transactional(propagation = Propagation.MANDATORY) public void mandatoryMethod() { }
@Transactional(propagation = Propagation.NEVER) public void neverMethod() { }
@Transactional(propagation = Propagation.NESTED) public void nestedMethod() { try { riskyOperation(); } catch (Exception e) { log.error("嵌套事务失败,但不影响外部事务", e); } } public void riskyOperation() { } }
|
# 声明式事务实战
# 基础声明式事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
@Service @Slf4j public class BasicTransactionService { @Autowired private OrderRepository orderRepository; @Autowired private InventoryRepository inventoryRepository;
@Transactional public Order createOrder(Order order) { log.info("开始创建订单:{}", order.getId()); Order savedOrder = orderRepository.save(order); inventoryRepository.reduceStock(order.getProductId(), order.getQuantity()); log.info("订单创建完成:{}", savedOrder.getId()); return savedOrder; }
@Transactional(readOnly = true) public Order getOrder(Long orderId) { return orderRepository.findById(orderId) .orElseThrow(() -> new BusinessException("订单不存在")); }
@Transactional(timeout = 30) public void batchProcess(List<Order> orders) { for (Order order : orders) { processOrder(order); } }
@Transactional(rollbackFor = {BusinessException.class, SystemException.class}) public void riskyOperation() throws BusinessException { if (someCondition()) { throw new BusinessException("业务异常,会触发回滚"); } }
@Transactional(noRollbackFor = {BusinessWarningException.class}) public void warningOperation() throws BusinessWarningException { if (warningCondition()) { throw new BusinessWarningException("业务警告,不触发回滚"); } } private boolean someCondition() { return true; } private boolean warningCondition() { return true; } private void processOrder(Order order) { } }
|
# 复杂事务场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
@Service @Slf4j public class ComplexTransactionService { @Autowired private UserRepository userRepository; @Autowired private AccountRepository accountRepository; @Autowired private TransactionTemplate transactionTemplate;
@Transactional public void updateUserWithNonTransactionalMethod(User user) { userRepository.save(user); updateAccountInfo(user); }
public void updateAccountInfo(User user) { Account account = accountRepository.findByUserId(user.getId()); account.setUpdateTime(new Date()); accountRepository.save(account); }
@Autowired private ComplexTransactionService self; @Transactional public void updateWithSelfCall(User user) { userRepository.save(user); self.updateAccountInTransaction(user); } @Transactional public void updateAccountInTransaction(User user) { Account account = accountRepository.findByUserId(user.getId()); account.setUpdateTime(new Date()); accountRepository.save(account); }
@Transactional public void updateWithAopContext(User user) { userRepository.save(user); ((ComplexTransactionService) AopContext.currentProxy()) .updateAccountInTransaction(user); }
public void programmaticTransaction(User user) { transactionTemplate.execute(status -> { try { userRepository.save(user); updateAccountInfo(user); return "success"; } catch (Exception e) { status.setRollbackOnly(); return "failed"; } }); }
@Transactional public void nestedTransactionExample(User user) { userRepository.save(user); try { nestedOperation(user); } catch (Exception e) { log.error("嵌套事务失败,但不影响主事务", e); } user.setStatus("COMPLETED"); userRepository.save(user); } @Transactional(propagation = Propagation.NESTED) public void nestedOperation(User user) { Account account = new Account(); account.setUserId(user.getId()); account.setBalance(BigDecimal.ZERO); accountRepository.save(account); if (account.getBalance().equals(BigDecimal.ZERO)) { throw new BusinessException("账户余额不能为零"); } }
@Transactional public void propagationBehaviorExample(User user) { userRepository.save(user); requiresNewLog(user.getId()); nestedValidation(user); supportsOperation(user); } @Transactional(propagation = Propagation.REQUIRES_NEW) public void requiresNewLog(Long userId) { logRepository.insertLog(userId, "用户操作", new Date()); } @Transactional(propagation = Propagation.NESTED) public void nestedValidation(User user) { if (user.getEmail() == null) { throw new BusinessException("邮箱不能为空"); } } @Transactional(propagation = Propagation.SUPPORTS) public void supportsOperation(User user) { cacheService.refreshUserCache(user.getId()); } @Autowired private LogRepository logRepository; @Autowired private CacheService cacheService; }
|
# 事务失效的常见原因与解决方案
# 1. 访问权限问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Service public class TransactionFailureExample1 { @Transactional private void privateMethod() { }
@Transactional public void publicMethod() { }
@Transactional public void wrapperMethod() { privateMethod(); } }
|
# 2. 自调用问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Service public class TransactionFailureExample2 { @Transactional public void methodA() { methodB(); } @Transactional public void methodB() { } }
@Service public class TransactionSolution2_1 { @Autowired private TransactionSolution2_1 self; @Transactional public void methodA() { self.methodB(); } @Transactional public void methodB() { } }
@Service public class TransactionSolution2_2 { @Transactional public void methodA() { ((TransactionSolution2_2) AopContext.currentProxy()).methodB(); } @Transactional public void methodB() { } }
|
# 3. 异常处理问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
@Service public class TransactionFailureExample3 { @Transactional public void methodWithTryCatch() { try { riskyOperation(); } catch (Exception e) { log.error("操作失败", e); } } }
@Service public class TransactionSolution3_1 { @Transactional public void methodWithTryCatch() { try { riskyOperation(); } catch (Exception e) { log.error("操作失败", e); throw e; } } }
@Service public class TransactionSolution3_2 { @Transactional public void methodWithManualRollback() { try { riskyOperation(); } catch (Exception e) { log.error("操作失败", e); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } } }
|
# 4. 数据库引擎问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Service public class TransactionFailureExample4 { @Transactional public void operationOnMyISAM() { myisamRepository.insert(data); } }
@Service public class TransactionSolution4 { @Transactional public void operationOnInnoDB() { innodbRepository.insert(data); } }
|
# 分布式事务实战
# JTA 分布式事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
@Configuration @EnableTransactionManagement public class JtaTransactionConfig {
@Bean public PlatformTransactionManager transactionManager() { JtaTransactionManager transactionManager = new JtaTransactionManager(); transactionManager.setTransactionManagerName("java:/TransactionManager"); return transactionManager; }
@Bean @Primary public DataSource primaryDataSource() { return DataSourceBuilder.create() .url("jdbc:mysql://localhost:3306/db1") .username("user") .password("password") .build(); } @Bean public DataSource secondaryDataSource() { return DataSourceBuilder.create() .url("jdbc:mysql://localhost:3306/db2") .username("user") .password("password") .build(); } }
@Service public class DistributedTransactionService { @Autowired @Qualifier("primaryDataSource") private DataSource primaryDataSource; @Autowired @Qualifier("secondaryDataSource") private DataSource secondaryDataSource;
@Transactional public void crossDatabaseOperation() { JdbcTemplate primaryTemplate = new JdbcTemplate(primaryDataSource); primaryTemplate.update("INSERT INTO table1 (name) VALUES (?)", "test1"); JdbcTemplate secondaryTemplate = new JdbcTemplate(secondaryDataSource); secondaryTemplate.update("INSERT INTO table2 (name) VALUES (?)", "test2"); } }
|
# Saga 模式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
@Service @Slf4j public class SagaTransactionService { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private PaymentService paymentService;
public void executeSaga(Order order) { List<SagaStep> steps = Arrays.asList( new CreateOrderStep(orderService, order), new ReduceInventoryStep(inventoryService, order), new ProcessPaymentStep(paymentService, order) ); SagaTransaction saga = new SagaTransaction(steps); try { saga.execute(); log.info("Saga事务执行成功"); } catch (Exception e) { log.error("Saga事务执行失败,开始补偿", e); saga.compensate(); } } }
public interface SagaStep { void execute() throws Exception; void compensate() throws Exception; }
public class CreateOrderStep implements SagaStep { private final OrderService orderService; private final Order order; public CreateOrderStep(OrderService orderService, Order order) { this.orderService = orderService; this.order = order; } @Override public void execute() throws Exception { orderService.createOrder(order); } @Override public void compensate() throws Exception { orderService.cancelOrder(order.getId()); } }
public class ReduceInventoryStep implements SagaStep { private final InventoryService inventoryService; private final Order order; public ReduceInventoryStep(InventoryService inventoryService, Order order) { this.inventoryService = inventoryService; this.order = order; } @Override public void execute() throws Exception { inventoryService.reduceStock(order.getProductId(), order.getQuantity()); } @Override public void compensate() throws Exception { inventoryService.addStock(order.getProductId(), order.getQuantity()); } }
public class SagaTransaction { private final List<SagaStep> steps; private final List<SagaStep> executedSteps = new ArrayList<>(); public SagaTransaction(List<SagaStep> steps) { this.steps = steps; } public void execute() throws Exception { for (SagaStep step : steps) { try { step.execute(); executedSteps.add(step); } catch (Exception e) { throw new SagaExecutionException("Saga步骤执行失败", e); } } } public void compensate() { for (int i = executedSteps.size() - 1; i >= 0; i--) { try { executedSteps.get(i).compensate(); } catch (Exception e) { log.error("补偿操作失败", e); } } } }
|
# 事务监控与性能优化
# 事务监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
@Aspect @Component @Slf4j public class TransactionMonitorAspect { @Autowired private MeterRegistry meterRegistry;
@Around("@annotation(org.springframework.transaction.annotation.Transactional)") public Object monitorTransaction(ProceedingJoinPoint joinPoint) throws Throwable { String methodName = joinPoint.getSignature().toShortString(); Timer.Sample sample = Timer.start(meterRegistry); try { Object result = joinPoint.proceed(); meterRegistry.counter("transaction.success", "method", methodName).increment(); return result; } catch (Exception e) { meterRegistry.counter("transaction.failure", "method", methodName, "exception", e.getClass().getSimpleName()).increment(); throw e; } finally { sample.stop(Timer.builder("transaction.duration") .tag("method", methodName) .register(meterRegistry)); } } }
@Service @Slf4j public class TransactionPerformanceService { @Autowired private TransactionTemplate transactionTemplate;
public void batchInsertOptimization(List<Order> orders) { for (Order order : orders) { transactionTemplate.execute(status -> { orderRepository.save(order); return null; }); } transactionTemplate.execute(status -> { for (Order order : orders) { orderRepository.save(order); } return null; }); }
public void longTransactionSplitting(List<Order> orders) { int batchSize = 100; for (int i = 0; i < orders.size(); i += batchSize) { int endIndex = Math.min(i + batchSize, orders.size()); List<Order> batch = orders.subList(i, endIndex); processBatch(batch); } } @Transactional(timeout = 30) public void processBatch(List<Order> orders) { for (Order order : orders) { processOrder(order); } } private void processOrder(Order order) { } @Autowired private OrderRepository orderRepository; }
|
# 事务最佳实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
@Service @Slf4j public class TransactionBestPractices {
@Transactional public void appropriateGranularity(Order order) { orderRepository.save(order); inventoryRepository.reduceStock(order.getProductId(), order.getQuantity()); orderLogRepository.insertLog(order.getId(), "订单创建"); }
@Transactional(readOnly = true) public List<Order> queryOrders(OrderQueryCondition condition) { return orderRepository.findByCondition(condition); }
@Transactional(timeout = 60) public void complexBusinessOperation() { }
@Transactional(rollbackFor = BusinessException.class) public void explicitExceptionHandling() { if (invalidCondition()) { throw new BusinessException("业务异常,明确回滚"); } }
@Transactional public void avoidTimeConsumingOperations(Order order) { orderRepository.save(order); } private boolean invalidCondition() { return false; } @Autowired private OrderRepository orderRepository; @Autowired private InventoryRepository inventoryRepository; @Autowired private OrderLogRepository orderLogRepository; }
|
# 总结:事务管理的智慧
Spring 事务管理是一个看似简单但实则复杂的主题。通过这篇文章,我们深入探讨了:
- 理论基础:ACID 特性和隔离级别是理解事务的基础
- 核心机制:PlatformTransactionManager 和事务传播行为是 Spring 事务的核心
- 实战技巧:声明式事务的使用和常见问题的解决
- 高级应用:分布式事务和性能优化
在我的开发经验中,事务管理的最佳实践可以总结为:
- 理解业务需求:不是所有操作都需要事务,要根据业务场景合理选择
- 控制事务边界:事务粒度要适当,既不能太大也不能太小
- 处理异常情况:明确哪些异常需要回滚,哪些不需要
- 监控事务性能:及时发现和解决长事务问题
- 测试事务逻辑:确保事务在各种异常情况下都能正确工作