随着业务的不断发展,原来融入在业务系统主流程中的辅助功能越来越多,每次增加新的逻辑,都要修改主干流程,比如发送微信服务号消息,发送邮件提醒等。 这种做法,让主干流程和辅助功能耦合太紧密,很容易在修改辅助功能的时候,导致主干流程的bug。本文介绍利用事件中心,让主干流程专注于业务核心,其他辅助功能会通过监听事件中心来实现,大大解耦了核心业务和辅助逻辑。
实现
- 事件中心
EventHub
事件中心,一般用来监听收集各种事件并分发给监听者列表.
代码语言:javascript复制/**
* 事件中心,一般用来监听收集各种事件并分发给监听者列表.
*
* @author tenmao
* @since 2019/12/12
*/
@Slf4j
public class EventHub<Event, Entity> {
private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
/**
* 同步监听(一般可以用在事务一致性场景).
*/
private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>();
/**
* 异步监听.
*/
private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>();
/**
* 注册同步监听器.
*
* @param event 事件
* @param syncListener 同步监听器
*/
public void subscribeSync(Event event, Consumer<Entity> syncListener) {
Preconditions.checkNotNull(event);
Preconditions.checkNotNull(syncListener);
log.info("subscribeSync: event[{}], listener[{}]", event, syncListener);
addListener(syncListenerMap, event, syncListener);
}
/**
* 注册异步监听器.
*
* @param event 事件
* @param asyncListener 异步监听器
*/
public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) {
Preconditions.checkNotNull(event);
Preconditions.checkNotNull(asyncListener);
log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener);
addListener(asyncListenerMap, event, asyncListener);
}
private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) {
Set<Consumer<Entity>> consumers = map.get(event);
if (consumers == null) {
consumers = new HashSet<>();
map.putIfAbsent(event, consumers);
}
consumers.add(listener);
}
/**
* 发布事件.
*
* @param event 事件
* @param entity 事件实体
*/
public void publishEvent(Event event, Entity entity) {
//订单状态变更
log.info("publishEvent: event[{}], entity[{}]", event, entity);
for (Consumer<Entity> consumer : syncListenerMap.get(event)) {
consumer.accept(entity);
}
for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) {
mdcThreadPoolExecutor.execute(() -> {
try {
asyncObserver.accept(entity);
} catch (Exception e) {
log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity);
}
});
}
}
}
- 为不同类型事件配置事件中心
/**
* @author tenmao
* @since 2019/12/12
*/
@Configuration
public class EventHubConfiguration {
/**
* 订单状态事件中心.
*/
@Bean
public EventHub<OrderStatus, Order> orderEventHub() {
return new EventHub<>();
}
/**
* 退款状态事件中心.
*/
@Bean
public EventHub<RefundStatus, Refund> refundEventHub() {
return new EventHub<>();
}
}
事件监听者
代码语言:javascript复制/**
* 发送提醒消息.
*
* @author tenmao
* @since 2019/12/12
*/
@Slf4j
@Component
public class EmailListener {
@Resource
private EmailRemoteManager EmailRemoteManager;
@Resource
private EventHub<OrderStatus, Order> orderEventHub;
@PostConstruct
private void init() {
//监听多个不同的事件,监听到后会发送提醒消息
orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed);
orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival);
}
private void handleOrderPayed(Order order) {
//推送 Email
EmailRequest EmailRequest = EmailRequest.build(order);
EmailRemoteManager.sendEmail(EmailRequest);
}
private void handleOrderArrival(Order order) {
//推送 Email
EmailRequest EmailRequest = EmailRequest.build(order);
EmailRemoteManager.sendEmail(EmailRequest);
}
}
事件源
业务系统,比如订单系统中,会在不同的订单状态时,发送相应的订单事件.
代码语言:javascript复制@Slf4j
@Component
public class OrderManager {
@Resource
private EventHub<OrderStatus, Order> orderEventHub;
public void payOrder(String orderNo) {
//TODO 订单付款
Order order = getOne(orderNo);
orderEventHub.publishEvent(OrderStatus.PAYED, order);
}
public void orderArrival(String orderNo) {
//TODO 订单到货
Order order = getOne(orderNo);
orderEventHub.publishEvent(OrderStatus.ARRIVAL, order);
}
//TODO 还有其他事件
}