「生命不息,折腾不止」
观察者模式定义
观察者模式是一种对象行为模式。它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。在观察者模式中,主体是通知的发布者,它发出通知时并不需要知道谁是它的观察者,可以有任意数目的观察者订阅并接收通知。
消息中间件与观察者模式
我们使用消息队列时,对于消息发送者来说,并不需要知道谁订阅了,只需要发送消息即可,对于消息接收者来说,可以订阅消息,也可以取消订阅,他们之间不存在耦合关系,所以我们使用消息队列来解耦系统,消息发送者和接收者之间,他们是一对多的关系,基于这个思想,我们就可以使用观察者模式来模拟消息队列。
观察者模式实现消息队列功能
下面我们使用观察者模式来实现下单过程,下单后,我们需要保存订单信息,扣减库存,短信通知消费者,对于这种涉及多个操作,我们一般需要异步执行, 所以一般将消息投放到MQ里面,然后各消费者进行消费,不过有些系统不想引入这种中间件,那么就可以考虑使用观察者模式来实现。
1.定义一个抽象类Subject
,Subject
我们称为主题,里面有subscribe()
,unsubscribe()
,notify()
三个方法。
public abstract class Subject<E> {
//订阅
abstract Result subscribe(Observer<E> orderObserver);
//取消订阅
abstract Result unsubscribe(Observer<E> orderObserver);
//通知
abstract void notify(E event);
}
2.统一返回结构
代码语言:javascript复制@Data
@Builder
public class Result {
int code;
String msg;
Object data;
}
3.定义订单实现类OrderSubject
,OrderSubject
是具体主题,他继承了Subject
,subscribe()
中保存订阅者,unsubscribe()
取消订阅,将订阅者实例从集合中移除,notify()
是通知订阅者。
public class OrderSubject<E> extends Subject<E> {
private List<Observer<E>> observers = new ArrayList<>();
@Override
Result subscribe(Observer<E> observer) {
if (this.observers.contains(observer))
return Result.builder().code(200).msg("已经订阅过").build();
this.observers.add(observer);
return Result.builder().code(200).msg("订阅成功").build();
}
@Override
Result unsubscribe(Observer<E> orderObserver) {
this.observers.remove(orderObserver);
return Result.builder().code(200).msg("已取消订阅").build();
}
@Override
void notify(E event) {
observers.forEach(observer -> {
observer.handEvent(event);
});
}
}
4.定义抽象观察者Observer,里面有一个抽象方法handEvent()
,它的作用是对消息进行处理。
public abstract class Observer<E> {
abstract void handEvent(E event);
}
5.定义具体观察者,有三个具体观察者,分别是短信观察者MessageObserver
,订单观察者OrderObserver
,StockObserver
库存观察者
/**
* 短信观察者
* @param <E>
*/
public class MessageObserver<E> extends Observer<E> {
@Override
void handEvent(E event) {
new Thread(new Runnable() {
@Override
public void run() {
sendMsg(event);
}
}).start();
}
private void sendMsg(E message){
System.out.println("短信通知");
}
}
代码语言:javascript复制/**
* 订单观察者
* @param <E>
*/
public class OrderObserver<E> extends Observer<E> {
@Override
void handEvent(E event){
new Thread(new Runnable() {
@Override
public void run() {
saveOrder(event);
}
}).start();
}
void saveOrder(E message){
System.out.println("保存订单");
}
}
代码语言:javascript复制/**
* 库存观察者
* @param <E>
*/
public class StockObserver<E> extends Observer<E> {
@Override
void handEvent(E event) {
new Thread(new Runnable() {
@Override
public void run() {
deductInventory(event);
}
}).start();
}
private void deductInventory(E o){
System.out.println("扣减库存");
}
}
6.测试
代码语言:javascript复制public class Client {
public static void main(String[] args) {
OrderSubject<Object> orderPublisher = new OrderSubject<>();
StockObserver<Object> stockSubscriber = new StockObserver<>();
OrderObserver<Object> orderSubscriber = new OrderObserver<>();
MessageObserver<Object> messageObserver = new MessageObserver<>();
Result order = orderPublisher.subscribe(orderSubscriber);
Result stock = orderPublisher.subscribe(stockSubscriber);
Result message = orderPublisher.subscribe(messageObserver);
orderPublisher.notify(new OrderInfo("12345678","223344","order"));
}
}
输出:
保存订单
扣减库存
短信通知
上面的短信观察者MessageObserver
,订单观察者OrderObserver
,StockObserver
库存观察者三个是异步执行的,因为 使用了线程,如果不使用线程的话,他们是同步执行的,当然,本处为了演示异步直接使用new Thread()
,但是在实际使用时这样可能不妥,因为这样做的话每个观察者并没有返回值,并不知道是否成功,可能有些异步操作需要将几个接口的返回值进行汇总或者判断,那么我们就可以使用Future
或者CompletableFuture
Future
public class StockObserver<E> extends Observer<E> {
final static ExecutorService executorService = Executors.newCachedThreadPool();
@Override
Future handEvent(E event) {
return executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return deductInventory(event);
}
});
}
private Boolean deductInventory(E o){
return Boolean.TRUE;
}
}
CompletableFuture
public class StockObserver<E> extends Observer<E> {
final static ExecutorService executorService = Executors.newCachedThreadPool();
@Override
CompletableFuture handEvent(E event) {
return CompletableFuture.supplyAsync(() -> {
return deductInventory(event);
});
}
private Boolean deductInventory(E o){
return Boolean.TRUE;
}
}
如上便可以获取异步执行结果。
上面我们就使用观察者模式来模仿消息队列完成了一个下单过程,不过这只是一个代码层面的实现,在实际的分布式系统中,我们还是会使用对应的消息中间件。
观察者模式的优点
观察者和被观察者是耦合的,这样就降低了系统的耦合度。
观察者模式的缺点
如果观察者很多的话,那么每一个都去通知,可能效率就不高,同时也会增加系统的复杂度。
今天的分享就到这里,我们下期再见