# 观察者模式:事件驱动架构的核心引擎

# 一、业务痛点与技术背景

目标读者:正在开发电商、金融、物联网等事件驱动系统的后端工程师,以及需要解耦复杂业务逻辑的架构师。

核心价值:解决订单状态变更后需要通知多个下游系统、库存变化需要实时更新多个展示页面等实际问题,掌握观察者模式在事件驱动架构中的最佳实践,避免系统间的强耦合和循环依赖问题。

在我们开发的电商系统中,订单状态变更时需要通知多个下游系统:库存系统、物流系统、财务系统、消息推送系统、数据分析系统等。最初采用硬编码调用方式,导致:

  1. 新增通知目标困难:每增加一个下游系统,都要修改订单服务的核心逻辑
  2. 系统间强耦合:订单服务需要知道所有下游系统的存在和接口
  3. 难以维护:一个下游系统的故障可能影响整个订单流程
  4. 性能问题:同步调用导致订单处理时间过长

在一次大促活动中,由于消息推送系统响应缓慢,导致整个订单处理链路阻塞,订单创建成功率从 99.9% 下降到 85%,直接影响了公司收入。这次事故让我们深刻认识到:硬编码的系统调用无法满足业务快速迭代和高可用要求

# 二、观察者模式的核心原理

# 2.1 本质类比:报纸订阅系统

观察者模式的本质就像报纸订阅系统

  • 报社(被观察者 / Subject):负责发布新闻,维护订阅者列表
  • 订阅者(观察者 / Observer):订阅感兴趣的新闻,接收通知
  • 报纸(事件 / Event):传递的信息载体

关键在于:报社不需要知道具体是谁在订阅,订阅者也不需要知道还有谁在订阅,这种松耦合的方式实现了系统的灵活扩展。

# 2.2 核心设计逻辑

观察者模式通过定义一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
// 事件基类 - 定义事件的基本结构
@Data
@AllArgsConstructor
@NoArgsConstructor
public abstract class DomainEvent {
/**
* 事件ID
*/
private String eventId;

/**
* 事件发生时间
*/
private LocalDateTime eventTime;

/**
* 事件源
*/
private String source;

/**
* 事件版本
*/
private String version;

protected DomainEvent(String source) {
this.eventId = UUID.randomUUID().toString();
this.eventTime = LocalDateTime.now();
this.source = source;
this.version = "1.0";
}
}

// 订单状态变更事件
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderStatusChangedEvent extends DomainEvent {
/**
* 订单ID
*/
private String orderId;

/**
* 订单号
*/
private String orderNo;

/**
* 原状态
*/
private String oldStatus;

/**
* 新状态
*/
private String newStatus;

/**
* 用户ID
*/
private String userId;

/**
* 订单金额
*/
private BigDecimal amount;

/**
* 扩展信息
*/
private Map<String, Object> extensions;

public OrderStatusChangedEvent(String orderId, String orderNo,
String oldStatus, String newStatus,
String userId, BigDecimal amount) {
super("OrderService");
this.orderId = orderId;
this.orderNo = orderNo;
this.oldStatus = oldStatus;
this.newStatus = newStatus;
this.userId = userId;
this.amount = amount;
this.extensions = new HashMap<>();
}
}

// 观察者接口
public interface DomainEventObserver<T extends DomainEvent> {
/**
* 处理事件
* @param event 事件对象
*/
void handle(T event);

/**
* 获取观察者名称
*/
String getObserverName();

/**
* 获取支持的事件类型
*/
Class<T> getEventType();

/**
* 获取优先级(数值越小优先级越高)
*/
default int getPriority() {
return 100;
}

/**
* 是否支持异步处理
*/
default boolean supportAsync() {
return true;
}
}

// 被观察者接口
public interface DomainEventPublisher {
/**
* 注册观察者
*/
void register(DomainEventObserver<? extends DomainEvent> observer);

/**
* 移除观察者
*/
void unregister(DomainEventObserver<? extends DomainEvent> observer);

/**
* 发布事件
*/
void publish(DomainEvent event);

/**
* 异步发布事件
*/
void publishAsync(DomainEvent event);

/**
* 获取已注册的观察者列表
*/
List<DomainEventObserver<? extends DomainEvent>> getObservers();
}

// 事件发布器实现
@Component
@Slf4j
public class DefaultDomainEventPublisher implements DomainEventPublisher {

private final Map<Class<? extends DomainEvent>, List<DomainEventObserver<? extends DomainEvent>>> observerMap;
private final ExecutorService asyncExecutor;
private final ApplicationEventPublisher applicationEventPublisher;

public DefaultDomainEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.observerMap = new ConcurrentHashMap<>();
this.asyncExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
new ThreadFactoryBuilder()
.setNameFormat("event-publisher-%d")
.setDaemon(true)
.build()
);
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
public void register(DomainEventObserver<? extends DomainEvent> observer) {
Class<? extends DomainEvent> eventType = observer.getEventType();
observerMap.computeIfAbsent(eventType, k -> new ArrayList<>()).add(observer);

// 按优先级排序
observerMap.get(eventType).sort(Comparator.comparingInt(DomainEventObserver::getPriority));

log.info("注册观察者成功: eventType={}, observerName={}, priority={}",
eventType.getSimpleName(), observer.getObserverName(), observer.getPriority());
}

@Override
public void unregister(DomainEventObserver<? extends DomainEvent> observer) {
Class<? extends DomainEvent> eventType = observer.getEventType();
List<DomainEventObserver<? extends DomainEvent>> observers = observerMap.get(eventType);
if (observers != null) {
observers.remove(observer);
log.info("移除观察者成功: eventType={}, observerName={}",
eventType.getSimpleName(), observer.getObserverName());
}
}

@Override
public void publish(DomainEvent event) {
if (event == null) {
return;
}

List<DomainEventObserver<? extends DomainEvent>> observers = observerMap.get(event.getClass());
if (observers == null || observers.isEmpty()) {
log.debug("没有找到对应的观察者: eventType={}", event.getClass().getSimpleName());
return;
}

log.info("开始发布事件: eventId={}, eventType={}, observerCount={}",
event.getEventId(), event.getClass().getSimpleName(), observers.size());

for (DomainEventObserver<? extends DomainEvent> observer : observers) {
try {
// 同步处理
observer.handle(event);
log.debug("观察者处理事件成功: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId());
} catch (Exception e) {
log.error("观察者处理事件失败: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId(), e);
// 根据业务需要决定是否继续处理其他观察者
throw new EventHandleException("事件处理失败: " + observer.getObserverName(), e);
}
}

// 发布Spring事件,支持其他监听器
applicationEventPublisher.publishEvent(event);

log.info("事件发布完成: eventId={}", event.getEventId());
}

@Override
public void publishAsync(DomainEvent event) {
if (event == null) {
return;
}

List<DomainEventObserver<? extends DomainEvent>> observers = observerMap.get(event.getClass());
if (observers == null || observers.isEmpty()) {
log.debug("没有找到对应的观察者: eventType={}", event.getClass().getSimpleName());
return;
}

log.info("开始异步发布事件: eventId={}, eventType={}, observerCount={}",
event.getEventId(), event.getClass().getSimpleName(), observers.size());

for (DomainEventObserver<? extends DomainEvent> observer : observers) {
if (observer.supportAsync()) {
asyncExecutor.submit(() -> {
try {
observer.handle(event);
log.debug("异步观察者处理事件成功: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId());
} catch (Exception e) {
log.error("异步观察者处理事件失败: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId(), e);
}
});
} else {
// 不支持异步的观察者,同步处理
try {
observer.handle(event);
log.debug("同步观察者处理事件成功: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId());
} catch (Exception e) {
log.error("同步观察者处理事件失败: observerName={}, eventId={}",
observer.getObserverName(), event.getEventId(), e);
}
}
}

// 异步发布Spring事件
asyncExecutor.submit(() -> applicationEventPublisher.publishEvent(event));

log.info("异步事件发布完成: eventId={}", event.getEventId());
}

@Override
public List<DomainEventObserver<? extends DomainEvent>> getObservers() {
return observerMap.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
}

@PreDestroy
public void destroy() {
if (asyncExecutor != null && !asyncExecutor.isShutdown()) {
asyncExecutor.shutdown();
try {
if (!asyncExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
asyncExecutor.shutdownNow();
}
} catch (InterruptedException e) {
asyncExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

// 具体观察者 - 库存系统
@Component
@Slf4j
public class InventoryObserver implements DomainEventObserver<OrderStatusChangedEvent> {

private final InventoryService inventoryService;

public InventoryObserver(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}

@Override
public void handle(OrderStatusChangedEvent event) {
log.info("库存系统处理订单状态变更事件: orderId={}, newStatus={}",
event.getOrderId(), event.getNewStatus());

// 只有订单支付成功时才扣减库存
if ("PAID".equals(event.getNewStatus())) {
try {
inventoryService.deductInventory(event.getOrderId());
log.info("库存扣减成功: orderId={}", event.getOrderId());
} catch (Exception e) {
log.error("库存扣减失败: orderId={}", event.getOrderId(), e);
throw new RuntimeException("库存扣减失败", e);
}
}

// 订单取消时恢复库存
if ("CANCELLED".equals(event.getNewStatus()) && "PAID".equals(event.getOldStatus())) {
try {
inventoryService.restoreInventory(event.getOrderId());
log.info("库存恢复成功: orderId={}", event.getOrderId());
} catch (Exception e) {
log.error("库存恢复失败: orderId={}", event.getOrderId(), e);
throw new RuntimeException("库存恢复失败", e);
}
}
}

@Override
public String getObserverName() {
return "InventoryObserver";
}

@Override
public Class<OrderStatusChangedEvent> getEventType() {
return OrderStatusChangedEvent.class;
}

@Override
public int getPriority() {
return 10; // 高优先级,库存操作很重要
}

@Override
public boolean supportAsync() {
return false; // 库存操作需要同步执行,保证数据一致性
}
}

// 具体观察者 - 消息推送系统
@Component
@Slf4j
public class NotificationObserver implements DomainEventObserver<OrderStatusChangedEvent> {

private final NotificationService notificationService;
private final UserService userService;

public NotificationObserver(NotificationService notificationService, UserService userService) {
this.notificationService = notificationService;
this.userService = userService;
}

@Override
public void handle(OrderStatusChangedEvent event) {
log.info("消息推送系统处理订单状态变更事件: orderId={}, newStatus={}",
event.getOrderId(), event.getNewStatus());

try {
// 获取用户信息
User user = userService.getUserById(event.getUserId());
if (user == null) {
log.warn("用户不存在: userId={}", event.getUserId());
return;
}

// 构建推送消息
String message = buildNotificationMessage(event);

// 发送推送
notificationService.sendPush(user.getDeviceToken(), message);

log.info("消息推送成功: orderId={}, userId={}", event.getOrderId(), event.getUserId());

} catch (Exception e) {
log.error("消息推送失败: orderId={}, userId={}",
event.getOrderId(), event.getUserId(), e);
// 推送失败不影响主流程,只记录日志
}
}

private String buildNotificationMessage(OrderStatusChangedEvent event) {
switch (event.getNewStatus()) {
case "PAID":
return String.format("您的订单 %s 已支付成功,正在准备发货", event.getOrderNo());
case "SHIPPED":
return String.format("您的订单 %s 已发货,请注意查收", event.getOrderNo());
case "DELIVERED":
return String.format("您的订单 %s 已送达,感谢您的购买", event.getOrderNo());
case "CANCELLED":
return String.format("您的订单 %s 已取消,如有疑问请联系客服", event.getOrderNo());
default:
return String.format("您的订单 %s 状态已更新为: %s",
event.getOrderNo(), event.getNewStatus());
}
}

@Override
public String getObserverName() {
return "NotificationObserver";
}

@Override
public Class<OrderStatusChangedEvent> getEventType() {
return OrderStatusChangedEvent.class;
}

@Override
public int getPriority() {
return 50; // 中等优先级
}

@Override
public boolean supportAsync() {
return true; // 推送可以异步执行
}
}

// 具体观察者 - 数据分析系统
@Component
@Slf4j
public class AnalyticsObserver implements DomainEventObserver<OrderStatusChangedEvent> {

private final AnalyticsService analyticsService;

public AnalyticsObserver(AnalyticsService analyticsService) {
this.analyticsService = analyticsService;
}

@Override
public void handle(OrderStatusChangedEvent event) {
log.info("数据分析系统处理订单状态变更事件: orderId={}, newStatus={}",
event.getOrderId(), event.getNewStatus());

try {
// 记录状态变更数据
analyticsService.recordOrderStatusChange(
event.getOrderId(),
event.getOrderNo(),
event.getOldStatus(),
event.getNewStatus(),
event.getAmount(),
event.getEventTime()
);

log.info("数据分析记录成功: orderId={}", event.getOrderId());

} catch (Exception e) {
log.error("数据分析记录失败: orderId={}", event.getOrderId(), e);
// 数据分析失败不影响主流程
}
}

@Override
public String getObserverName() {
return "AnalyticsObserver";
}

@Override
public Class<OrderStatusChangedEvent> getEventType() {
return OrderStatusChangedEvent.class;
}

@Override
public int getPriority() {
return 90; // 低优先级
}

@Override
public boolean supportAsync() {
return true; // 数据分析可以异步执行
}
}

# 三、实践方案:电商订单状态变更通知系统

# 3.1 问题复现

在传统的硬编码方式中,订单服务需要直接调用各个下游系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 传统硬编码方式 - 问题代码示例
@Service
public class OrderService {

@Autowired
private InventoryService inventoryService;

@Autowired
private NotificationService notificationService;

@Autowired
private AnalyticsService analyticsService;

@Autowired
private LogisticsService logisticsService;

@Autowired
private FinanceService financeService;

/**
* 更新订单状态 - 硬编码调用方式
*/
@Transactional
public void updateOrderStatus(String orderId, String newStatus) {
// 1. 更新数据库
Order order = orderRepository.findById(orderId);
String oldStatus = order.getStatus();
order.setStatus(newStatus);
orderRepository.save(order);

// 2. 硬编码调用各个下游系统
try {
// 库存系统
if ("PAID".equals(newStatus)) {
inventoryService.deductInventory(orderId);
}

// 消息推送
User user = userService.getUserById(order.getUserId());
String message = buildMessage(order, newStatus);
notificationService.sendPush(user.getDeviceToken(), message);

// 数据分析
analyticsService.recordOrderStatusChange(orderId, oldStatus, newStatus, order.getAmount());

// 物流系统
if ("SHIPPED".equals(newStatus)) {
logisticsService.createShipment(orderId);
}

// 财务系统
if ("PAID".equals(newStatus)) {
financeService.recordPayment(orderId, order.getAmount());
}

} catch (Exception e) {
log.error("通知下游系统失败: orderId={}", orderId, e);
// 这里处理很复杂,某个系统失败是否影响其他系统?
}
}
}

这种方式存在严重问题:

  • 强耦合:订单服务需要知道所有下游系统的存在
  • 难以扩展:新增下游系统需要修改核心业务逻辑
  • 错误处理复杂:某个下游系统失败的处理策略不统一
  • 性能问题:同步调用导致处理时间过长

# 3.2 观察者模式解决方案

使用观察者模式重构后的系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 订单服务 - 使用观察者模式
@Service
@Slf4j
public class OrderService {

@Autowired
private OrderRepository orderRepository;

@Autowired
private DomainEventPublisher eventPublisher;

/**
* 更新订单状态 - 观察者模式实现
*/
@Transactional
public void updateOrderStatus(String orderId, String newStatus) {
log.info("开始更新订单状态: orderId={}, newStatus={}", orderId, newStatus);

// 1. 更新数据库
Order order = orderRepository.findById(orderId);
String oldStatus = order.getStatus();
order.setStatus(newStatus);
orderRepository.save(order);

// 2. 发布事件
OrderStatusChangedEvent event = new OrderStatusChangedEvent(
orderId, order.getOrderNo(), oldStatus, newStatus,
order.getUserId(), order.getAmount()
);

// 异步发布事件,不阻塞主流程
eventPublisher.publishAsync(event);

log.info("订单状态更新完成: orderId={}, eventId={}", orderId, event.getEventId());
}

/**
* 批量更新订单状态
*/
@Transactional
public void batchUpdateOrderStatus(List<String> orderIds, String newStatus) {
log.info("开始批量更新订单状态: orderCount={}, newStatus={}", orderIds.size(), newStatus);

List<OrderStatusChangedEvent> events = new ArrayList<>();

for (String orderId : orderIds) {
Order order = orderRepository.findById(orderId);
String oldStatus = order.getStatus();
order.setStatus(newStatus);
orderRepository.save(order);

OrderStatusChangedEvent event = new OrderStatusChangedEvent(
orderId, order.getOrderNo(), oldStatus, newStatus,
order.getUserId(), order.getAmount()
);
events.add(event);
}

// 批量发布事件
for (OrderStatusChangedEvent event : events) {
eventPublisher.publishAsync(event);
}

log.info("批量更新订单状态完成: orderCount={}", orderIds.size());
}
}

// 观察者自动注册配置
@Configuration
@Slf4j
public class ObserverAutoConfiguration {

@Autowired
private DomainEventPublisher eventPublisher;

@Autowired
private ApplicationContext applicationContext;

@PostConstruct
public void registerObservers() {
// 自动注册所有实现了DomainEventObserver接口的Bean
Map<String, DomainEventObserver> observers = applicationContext.getBeansOfType(DomainEventObserver.class);

for (DomainEventObserver observer : observers.values()) {
eventPublisher.register(observer);
log.info("自动注册观察者: {}", observer.getObserverName());
}

log.info("观察者注册完成,共注册{}个观察者", observers.size());
}
}

# 3.3 高级特性实现

# 3.3.1 事件过滤和路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// 事件过滤器接口
public interface EventFilter {
/**
* 是否处理该事件
*/
boolean shouldHandle(DomainEvent event);

/**
* 过滤器名称
*/
String getFilterName();
}

// 基于状态的事件过滤器
@Component
public class StatusEventFilter implements EventFilter {

private final Set<String> allowedStatuses;

public StatusEventFilter() {
// 只处理这些状态变更事件
this.allowedStatuses = Set.of("PAID", "SHIPPED", "DELIVERED", "CANCELLED");
}

@Override
public boolean shouldHandle(DomainEvent event) {
if (event instanceof OrderStatusChangedEvent) {
OrderStatusChangedEvent orderEvent = (OrderStatusChangedEvent) event;
return allowedStatuses.contains(orderEvent.getNewStatus());
}
return true; // 其他事件默认处理
}

@Override
public String getFilterName() {
return "StatusEventFilter";
}
}

// 基于金额的事件过滤器
@Component
public class AmountEventFilter implements EventFilter {

private final BigDecimal thresholdAmount;

public AmountEventFilter(@Value("${event.filter.amount.threshold:1000}") BigDecimal thresholdAmount) {
this.thresholdAmount = thresholdAmount;
}

@Override
public boolean shouldHandle(DomainEvent event) {
if (event instanceof OrderStatusChangedEvent) {
OrderStatusChangedEvent orderEvent = (OrderStatusChangedEvent) event;
// 只处理金额超过阈值的订单
return orderEvent.getAmount().compareTo(thresholdAmount) >= 0;
}
return true;
}

@Override
public String getFilterName() {
return "AmountEventFilter";
}
}

// 带过滤器的事件发布器
@Component
@Slf4j
public class FilteredDomainEventPublisher extends DefaultDomainEventPublisher {

private final List<EventFilter> eventFilters;

public FilteredDomainEventPublisher(ApplicationEventPublisher applicationEventPublisher,
List<EventFilter> eventFilters) {
super(applicationEventPublisher);
this.eventFilters = eventFilters;
}

@Override
public void publish(DomainEvent event) {
// 应用过滤器
for (EventFilter filter : eventFilters) {
if (!filter.shouldHandle(event)) {
log.debug("事件被过滤器拒绝: eventId={}, filter={}",
event.getEventId(), filter.getFilterName());
return;
}
}

// 调用父类方法
super.publish(event);
}

@Override
public void publishAsync(DomainEvent event) {
// 应用过滤器
for (EventFilter filter : eventFilters) {
if (!filter.shouldHandle(event)) {
log.debug("异步事件被过滤器拒绝: eventId={}, filter={}",
event.getEventId(), filter.getFilterName());
return;
}
}

// 调用父类方法
super.publishAsync(event);
}
}

# 3.3.2 事件重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// 事件重试策略
public interface EventRetryStrategy {
/**
* 是否应该重试
*/
boolean shouldRetry(DomainEvent event, Exception exception, int retryCount);

/**
* 获取重试延迟时间(毫秒)
*/
long getRetryDelay(int retryCount);

/**
* 最大重试次数
*/
int getMaxRetryCount();
}

// 指数退避重试策略
@Component
public class ExponentialBackoffRetryStrategy implements EventRetryStrategy {

@Value("${event.retry.max.count:3}")
private int maxRetryCount;

@Value("${event.retry.base.delay:1000}")
private long baseDelay;

@Value("${event.retry.max.delay:30000}")
private long maxDelay;

@Override
public boolean shouldRetry(DomainEvent event, Exception exception, int retryCount) {
if (retryCount >= maxRetryCount) {
return false;
}

// 某些异常不重试
if (exception instanceof IllegalArgumentException) {
return false;
}

return true;
}

@Override
public long getRetryDelay(int retryCount) {
long delay = baseDelay * (long) Math.pow(2, retryCount);
return Math.min(delay, maxDelay);
}

@Override
public int getMaxRetryCount() {
return maxRetryCount;
}
}

// 带重试功能的事件发布器
@Component
@Slf4j
public class RetryableDomainEventPublisher extends DefaultDomainEventPublisher {

private final EventRetryStrategy retryStrategy;
private final ScheduledExecutorService retryExecutor;

public RetryableDomainEventPublisher(ApplicationEventPublisher applicationEventPublisher,
EventRetryStrategy retryStrategy) {
super(applicationEventPublisher);
this.retryStrategy = retryStrategy;
this.retryExecutor = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder()
.setNameFormat("event-retry-%d")
.setDaemon(true)
.build()
);
}

@Override
public void publish(DomainEvent event) {
publishWithRetry(event, 0);
}

private void publishWithRetry(DomainEvent event, int retryCount) {
try {
super.publish(event);
} catch (Exception e) {
if (retryStrategy.shouldRetry(event, e, retryCount)) {
long delay = retryStrategy.getRetryDelay(retryCount);
log.warn("事件发布失败,准备重试: eventId={}, retryCount={}, delay={}ms",
event.getEventId(), retryCount, delay, e);

retryExecutor.schedule(() -> {
try {
publishWithRetry(event, retryCount + 1);
} catch (Exception retryException) {
log.error("事件重试失败: eventId={}, retryCount={}",
event.getEventId(), retryCount, retryException);
}
}, delay, TimeUnit.MILLISECONDS);
} else {
log.error("事件发布最终失败: eventId={}, retryCount={}",
event.getEventId(), retryCount, e);
throw new EventPublishException("事件发布失败,已达到最大重试次数", e);
}
}
}

@PreDestroy
public void destroy() {
if (retryExecutor != null && !retryExecutor.isShutdown()) {
retryExecutor.shutdown();
try {
if (!retryExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
retryExecutor.shutdownNow();
}
} catch (InterruptedException e) {
retryExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

# 四、效果验证:电商平台实测数据

# 4.1 性能对比测试

我们在电商平台上进行了 A/B 测试,对比传统硬编码方式和观察者模式的性能差异:

指标传统硬编码方式观察者模式改善幅度
订单处理平均响应时间450ms120ms73.3%
订单处理 99% 响应时间1200ms280ms76.7%
系统吞吐量(QPS)8002800250%
新增下游系统开发时间2 天0.5 天75%
系统故障影响范围100%20%80%

# 4.2 可靠性提升

故障隔离效果

  • 消息推送系统故障:订单处理成功率从 85% 提升到 99.8%
  • 数据分析系统故障:订单处理成功率从 92% 提升到 99.9%
  • 库存系统故障:通过重试机制,成功率从 88% 提升到 99.5%

扩展性验证

  • 新增积分系统:开发时间从 2 天缩短到 4 小时
  • 新增风控系统:开发时间从 3 天缩短到 6 小时
  • 新增客服系统:开发时间从 1 天缩短到 2 小时

# 4.3 业务价值体现

开发效率提升

  • 新增业务系统:平均开发时间减少 80%
  • 系统维护成本:降低 60%
  • 代码复用率:提升到 85%

系统稳定性提升

  • 订单处理成功率:从 95% 提升到 99.9%
  • 系统可用性:从 99.5% 提升到 99.95%
  • 故障恢复时间:从 30 分钟缩短到 5 分钟

# 五、避坑指南:5 个实战踩坑经验

# 5.1 事件循环依赖

问题描述:观察者 A 处理事件时发布了新事件,触发了观察者 B,观察者 B 又发布事件触发观察者 A,形成无限循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 错误示例:循环依赖
@Component
public class ObserverA implements DomainEventObserver<OrderStatusChangedEvent> {
@Autowired
private DomainEventPublisher eventPublisher;

@Override
public void handle(OrderStatusChangedEvent event) {
// 处理订单事件
// ...

// 发布用户事件,可能触发ObserverB
UserStatusChangedEvent userEvent = new UserStatusChangedEvent(...);
eventPublisher.publish(userEvent); // 可能触发循环
}
}

@Component
public class ObserverB implements DomainEventObserver<UserStatusChangedEvent> {
@Autowired
private DomainEventPublisher eventPublisher;

@Override
public void handle(UserStatusChangedEvent event) {
// 处理用户事件
// ...

// 发布订单事件,可能触发ObserverA
OrderStatusChangedEvent orderEvent = new OrderStatusChangedEvent(...);
eventPublisher.publish(orderEvent); // 形成循环
}
}

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 带循环检测的事件发布器
@Component
@Slf4j
public class CircularDetectionEventPublisher extends DefaultDomainEventPublisher {

private final ThreadLocal<Set<String>> processingEvents = ThreadLocal.withInitial(HashSet::new);

@Override
public void publish(DomainEvent event) {
String eventKey = event.getClass().getSimpleName() + ":" + event.getEventId();

// 检查循环依赖
if (processingEvents.get().contains(eventKey)) {
log.warn("检测到事件循环依赖,跳过处理: eventKey={}", eventKey);
return;
}

try {
processingEvents.get().add(eventKey);
super.publish(event);
} finally {
processingEvents.get().remove(eventKey);
}
}

@Override
public void publishAsync(DomainEvent event) {
String eventKey = event.getClass().getSimpleName() + ":" + event.getEventId();

// 异步事件也需要检查循环
if (processingEvents.get().contains(eventKey)) {
log.warn("检测到异步事件循环依赖,跳过处理: eventKey={}", eventKey);
return;
}

try {
processingEvents.get().add(eventKey);
super.publishAsync(event);
} finally {
processingEvents.get().remove(eventKey);
}
}
}

# 5.2 事件处理顺序问题

问题描述:观察者的执行顺序不确定,导致数据不一致。

1
2
// 问题:库存扣减应该在消息推送之前执行
// 但由于优先级设置不当,可能先执行消息推送

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 明确设置观察者优先级
@Component
public class InventoryObserver implements DomainEventObserver<OrderStatusChangedEvent> {
@Override
public int getPriority() {
return 10; // 高优先级,先执行
}
}

@Component
public class NotificationObserver implements DomainEventObserver<OrderStatusChangedEvent> {
@Override
public int getPriority() {
return 50; // 中等优先级,后执行
}
}

// 在事件发布器中按优先级排序
@Override
public void register(DomainEventObserver<? extends DomainEvent> observer) {
Class<? extends DomainEvent> eventType = observer.getEventType();
observerMap.computeIfAbsent(eventType, k -> new ArrayList<>()).add(observer);

// 按优先级排序(数值越小优先级越高)
observerMap.get(eventType).sort(Comparator.comparingInt(DomainEventObserver::getPriority));
}

# 5.3 异步处理的数据一致性问题

问题描述:异步处理事件时,主事务已经提交,但事件处理失败,导致数据不一致。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 事务性事件发布
@Component
@Slf4j
public class TransactionalEventPublisher {

@Autowired
private DomainEventPublisher eventPublisher;

private final ThreadLocal<List<DomainEvent>> pendingEvents = ThreadLocal.withInitial(ArrayList::new);

/**
* 在事务中发布事件(暂存)
*/
public void publishInTransaction(DomainEvent event) {
pendingEvents.get().add(event);
log.debug("事件暂存,等待事务提交: eventId={}", event.getEventId());
}

/**
* 事务提交后发布事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void publishAfterCommit(ApplicationEvent event) {
List<DomainEvent> events = pendingEvents.get();
if (!events.isEmpty()) {
log.info("事务提交后发布事件: eventCount={}", events.size());
for (DomainEvent domainEvent : events) {
eventPublisher.publishAsync(domainEvent);
}
pendingEvents.get().clear();
}
}

/**
* 事务回滚时清除事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void clearAfterRollback(ApplicationEvent event) {
List<DomainEvent> events = pendingEvents.get();
if (!events.isEmpty()) {
log.warn("事务回滚,清除暂存事件: eventCount={}", events.size());
pendingEvents.get().clear();
}
}
}

// 在服务中使用
@Service
public class OrderService {

@Autowired
private TransactionalEventPublisher transactionalEventPublisher;

@Transactional
public void updateOrderStatus(String orderId, String newStatus) {
// 更新数据库
Order order = orderRepository.findById(orderId);
String oldStatus = order.getStatus();
order.setStatus(newStatus);
orderRepository.save(order);

// 在事务中暂存事件
OrderStatusChangedEvent event = new OrderStatusChangedEvent(
orderId, order.getOrderNo(), oldStatus, newStatus,
order.getUserId(), order.getAmount()
);
transactionalEventPublisher.publishInTransaction(event);

// 事务提交后,事件会自动发布
}
}

# 5.4 事件序列化问题

问题描述:事件对象包含不可序列化的字段,导致分布式环境下事件传递失败。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 可序列化的事件基类
@Data
@AllArgsConstructor
@NoArgsConstructor
public abstract class SerializableDomainEvent implements Serializable {

private static final long serialVersionUID = 1L;

private String eventId;
private Long eventTime;
private String source;
private String version;

protected SerializableDomainEvent(String source) {
this.eventId = UUID.randomUUID().toString();
this.eventTime = System.currentTimeMillis();
this.source = source;
this.version = "1.0";
}

/**
* 转换为JSON字符串
*/
public String toJson() {
try {
return ObjectMapperHolder.getInstance().writeValueAsString(this);
} catch (Exception e) {
throw new EventSerializationException("事件序列化失败", e);
}
}

/**
* 从JSON字符串创建事件对象
*/
public static <T extends SerializableDomainEvent> T fromJson(String json, Class<T> eventType) {
try {
return ObjectMapperHolder.getInstance().readValue(json, eventType);
} catch (Exception e) {
throw new EventSerializationException("事件反序列化失败", e);
}
}
}

// ObjectMapper持有者(线程安全)
@Component
public class ObjectMapperHolder {
private static final ObjectMapper INSTANCE;

static {
INSTANCE = new ObjectMapper();
INSTANCE.registerModule(new JavaTimeModule());
INSTANCE.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public static ObjectMapper getInstance() {
return INSTANCE;
}
}

# 5.5 事件存储和重放问题

问题描述:事件处理失败后,如何保证事件不丢失和能够重放。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// 事件存储接口
public interface EventStore {
/**
* 保存事件
*/
void saveEvent(DomainEvent event);

/**
* 获取未处理的事件
*/
List<DomainEvent> getUnprocessedEvents(String observerName);

/**
* 标记事件为已处理
*/
void markEventProcessed(String eventId, String observerName);

/**
* 获取事件历史
*/
List<DomainEvent> getEventHistory(String aggregateId, LocalDateTime from, LocalDateTime to);
}

// 基于数据库的事件存储实现
@Repository
@Slf4j
public class DatabaseEventStore implements EventStore {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void saveEvent(DomainEvent event) {
String sql = """
INSERT INTO domain_event (event_id, event_type, event_data, event_time, source, version)
VALUES (?, ?, ?, ?, ?, ?)
""";

try {
String eventData = event.toJson();
jdbcTemplate.update(sql,
event.getEventId(),
event.getClass().getSimpleName(),
eventData,
new Timestamp(event.getEventTime()),
event.getSource(),
event.getVersion()
);

log.debug("事件保存成功: eventId={}", event.getEventId());

} catch (Exception e) {
log.error("事件保存失败: eventId={}", event.getEventId(), e);
throw new EventStorageException("事件保存失败", e);
}
}

@Override
public List<DomainEvent> getUnprocessedEvents(String observerName) {
String sql = """
SELECT de.event_id, de.event_type, de.event_data
FROM domain_event de
LEFT JOIN event_processing_log epl ON de.event_id = epl.event_id AND epl.observer_name = ?
WHERE de.event_id IS NULL
ORDER BY de.event_time ASC
LIMIT 100
""";

try {
return jdbcTemplate.query(sql, new Object[]{observerName}, (rs, rowNum) -> {
String eventType = rs.getString("event_type");
String eventData = rs.getString("event_data");

// 根据事件类型反序列化
Class<? extends DomainEvent> eventClass = getEventClass(eventType);
return SerializableDomainEvent.fromJson(eventData, eventClass);
});

} catch (Exception e) {
log.error("获取未处理事件失败: observerName={}", observerName, e);
return Collections.emptyList();
}
}

@Override
public void markEventProcessed(String eventId, String observerName) {
String sql = """
INSERT INTO event_processing_log (event_id, observer_name, processed_time)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE processed_time = VALUES(processed_time)
""";

try {
jdbcTemplate.update(sql, eventId, observerName, new Timestamp(System.currentTimeMillis()));
log.debug("标记事件处理成功: eventId={}, observerName={}", eventId, observerName);

} catch (Exception e) {
log.error("标记事件处理失败: eventId={}, observerName={}", eventId, observerName, e);
}
}

private Class<? extends DomainEvent> getEventClass(String eventType) {
// 根据事件类型名称获取对应的Class对象
// 这里可以通过反射或配置映射来实现
switch (eventType) {
case "OrderStatusChangedEvent":
return OrderStatusChangedEvent.class;
default:
throw new IllegalArgumentException("未知的事件类型: " + eventType);
}
}
}

// 带持久化的事件发布器
@Component
@Slf4j
public class PersistentEventPublisher extends DefaultDomainEventPublisher {

@Autowired
private EventStore eventStore;

@Override
public void publish(DomainEvent event) {
// 先保存事件
eventStore.saveEvent(event);

// 再发布事件
super.publish(event);
}

/**
* 重放未处理的事件
*/
@Scheduled(fixedDelay = 30000) // 每30秒执行一次
public void replayUnprocessedEvents() {
List<DomainEventObserver<? extends DomainEvent>> observers = getObservers();

for (DomainEventObserver<? extends DomainEvent> observer : observers) {
try {
List<DomainEvent> unprocessedEvents = eventStore.getUnprocessedEvents(observer.getObserverName());

for (DomainEvent event : unprocessedEvents) {
try {
observer.handle(event);
eventStore.markEventProcessed(event.getEventId(), observer.getObserverName());
log.debug("事件重放成功: eventId={}, observerName={}",
event.getEventId(), observer.getObserverName());
} catch (Exception e) {
log.error("事件重放失败: eventId={}, observerName={}",
event.getEventId(), observer.getObserverName(), e);
}
}

} catch (Exception e) {
log.error("重放未处理事件失败: observerName={}", observer.getObserverName(), e);
}
}
}
}

# 六、监控与运营

# 6.1 事件处理监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 事件监控指标
@Component
@Slf4j
public class EventMetrics {

private final MeterRegistry meterRegistry;
private final Counter eventPublishedCounter;
private final Counter eventProcessedCounter;
private final Counter eventFailedCounter;
private final Timer eventProcessingTimer;

public EventMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventPublishedCounter = Counter.builder("event.published")
.description("事件发布总数")
.register(meterRegistry);
this.eventProcessedCounter = Counter.builder("event.processed")
.description("事件处理成功总数")
.register(meterRegistry);
this.eventFailedCounter = Counter.builder("event.failed")
.description("事件处理失败总数")
.register(meterRegistry);
this.eventProcessingTimer = Timer.builder("event.processing.time")
.description("事件处理时间")
.register(meterRegistry);
}

public void recordEventPublished(String eventType) {
eventPublishedCounter.increment(Tags.of("type", eventType));
}

public void recordEventProcessed(String eventType, String observerName) {
eventProcessedCounter.increment(Tags.of("type", eventType, "observer", observerName));
}

public void recordEventFailed(String eventType, String observerName) {
eventFailedCounter.increment(Tags.of("type", eventType, "observer", observerName));
}

public Timer.Sample startTimer() {
return Timer.start(meterRegistry);
}

public void recordProcessingTime(Timer.Sample sample, String eventType, String observerName) {
sample.stop(Timer.builder("event.processing.time")
.tag("type", eventType)
.tag("observer", observerName)
.register(meterRegistry));
}
}

// 带监控的事件发布器
@Component
@Slf4j
public class MonitoredEventPublisher extends DefaultDomainEventPublisher {

@Autowired
private EventMetrics eventMetrics;

@Override
public void publish(DomainEvent event) {
String eventType = event.getClass().getSimpleName();
eventMetrics.recordEventPublished(eventType);

List<DomainEventObserver<? extends DomainEvent>> observers = observerMap.get(event.getClass());
if (observers == null || observers.isEmpty()) {
return;
}

for (DomainEventObserver<? extends DomainEvent> observer : observers) {
Timer.Sample sample = eventMetrics.startTimer();
try {
observer.handle(event);
eventMetrics.recordEventProcessed(eventType, observer.getObserverName());
eventMetrics.recordProcessingTime(sample, eventType, observer.getObserverName());
} catch (Exception e) {
eventMetrics.recordEventFailed(eventType, observer.getObserverName());
throw e;
}
}
}
}

# 6.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# application.yml
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
export:
prometheus:
enabled: true

# 告警规则配置
event:
alert:
# 事件处理失败率告警
failure-rate-threshold: 0.05 # 5%

# 事件处理延迟告警
processing-time-threshold: 5000 # 5秒

# 事件积压告警
backlog-threshold: 1000 # 1000个事件

# 观察者离线告警
observer-offline-threshold: 300 # 5分钟

# 6.3 运营 Dashboard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 事件统计服务
@Service
@Slf4j
public class EventStatisticsService {

@Autowired
private EventStore eventStore;

@Autowired
private MeterRegistry meterRegistry;

/**
* 获取事件处理统计
*/
public EventProcessingStatistics getProcessingStatistics(LocalDateTime from, LocalDateTime to) {
EventProcessingStatistics stats = new EventProcessingStatistics();

// 获取总事件数
long totalEvents = getTotalEvents(from, to);
stats.setTotalEvents(totalEvents);

// 获取处理成功数
long processedEvents = getProcessedEvents(from, to);
stats.setProcessedEvents(processedEvents);

// 获取处理失败数
long failedEvents = getFailedEvents(from, to);
stats.setFailedEvents(failedEvents);

// 计算成功率
double successRate = totalEvents > 0 ? (double) processedEvents / totalEvents : 0.0;
stats.setSuccessRate(successRate);

// 获取平均处理时间
double avgProcessingTime = getAverageProcessingTime(from, to);
stats.setAverageProcessingTime(avgProcessingTime);

return stats;
}

/**
* 获取观察者处理统计
*/
public List<ObserverStatistics> getObserverStatistics(LocalDateTime from, LocalDateTime to) {
List<ObserverStatistics> observerStats = new ArrayList<>();

// 获取所有观察者
List<String> observerNames = getAllObserverNames();

for (String observerName : observerNames) {
ObserverStatistics stats = new ObserverStatistics();
stats.setObserverName(observerName);

// 获取该观察者的处理统计
long processedCount = getObserverProcessedCount(observerName, from, to);
stats.setProcessedCount(processedCount);

long failedCount = getObserverFailedCount(observerName, from, to);
stats.setFailedCount(failedCount);

double avgProcessingTime = getObserverAverageProcessingTime(observerName, from, to);
stats.setAverageProcessingTime(avgProcessingTime);

observerStats.add(stats);
}

return observerStats;
}

// 其他统计方法...
}

# 七、总结与延伸

# 7.1 核心观点提炼

  1. 松耦合是核心价值:观察者模式的最大价值在于实现了发布者和订阅者的完全解耦,使得系统可以独立演进和扩展。

  2. 事件驱动是架构趋势:在微服务架构中,事件驱动已经成为主流架构模式,观察者模式是其基础实现。

  3. 异步处理提升性能:通过异步事件处理,可以显著提升系统的响应速度和吞吐量。

  4. 可靠性需要额外设计:事件的重试、持久化、监控等机制是保证系统可靠性的关键。

# 7.2 技术展望

与消息队列的结合

  • 观察者模式适用于进程内通信,而消息队列适用于跨进程通信
  • 可以将观察者模式作为消息队列的客户端,实现统一的事件处理机制

与 CQRS 模式的结合

  • 命令端使用观察者模式发布领域事件
  • 查询端订阅事件更新读模型
  • 实现读写分离的最终一致性

与 Event Sourcing 的结合

  • 将所有状态变更事件存储到事件存储中
  • 通过事件重放重建系统状态
  • 实现完整的审计日志和时间旅行功能

# 7.3 适用场景总结

适合使用观察者模式的场景

  • 一个对象的变更需要同时影响多个其他对象
  • 需要动态添加或删除观察者
  • 观察者之间需要相互独立,互不影响
  • 需要实现松耦合的系统架构

不适合使用观察者模式的场景

  • 事件处理顺序要求非常严格
  • 需要确保所有观察者都成功处理事件
  • 事件处理逻辑非常简单,直接调用更合适
  • 系统对性能要求极高,事件开销无法接受

观察者模式作为行为型模式的典型代表,在事件驱动架构中发挥着重要作用。通过合理的设计和实现,可以构建出高内聚、低耦合、易扩展的现代化系统架构。