白话设计模式之观察者模式

2022-07-26 17:02:07 浏览数 (1)

「生命不息,折腾不止」

观察者模式定义

观察者模式是一种对象行为模式。它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。在观察者模式中,主体是通知的发布者,它发出通知时并不需要知道谁是它的观察者,可以有任意数目的观察者订阅并接收通知。

消息中间件与观察者模式

我们使用消息队列时,对于消息发送者来说,并不需要知道谁订阅了,只需要发送消息即可,对于消息接收者来说,可以订阅消息,也可以取消订阅,他们之间不存在耦合关系,所以我们使用消息队列来解耦系统,消息发送者和接收者之间,他们是一对多的关系,基于这个思想,我们就可以使用观察者模式来模拟消息队列。

观察者模式实现消息队列功能

下面我们使用观察者模式来实现下单过程,下单后,我们需要保存订单信息,扣减库存,短信通知消费者,对于这种涉及多个操作,我们一般需要异步执行, 所以一般将消息投放到MQ里面,然后各消费者进行消费,不过有些系统不想引入这种中间件,那么就可以考虑使用观察者模式来实现。

1.定义一个抽象类Subject,Subject我们称为主题,里面有subscribe(),unsubscribe(),notify()三个方法。

代码语言:javascript复制
public abstract class Subject<E> {
    //订阅
    abstract Result subscribe(Observer<E> orderObserver);
    //取消订阅
    abstract Result unsubscribe(Observer<E> orderObserver);
    //通知
    abstract void notify(E event);
}

2.统一返回结构

代码语言:javascript复制
@Data
@Builder
public class Result {
    int code;
    String msg;
    Object data;
}

3.定义订单实现类OrderSubjectOrderSubject是具体主题,他继承了Subject,subscribe()中保存订阅者,unsubscribe()取消订阅,将订阅者实例从集合中移除,notify()是通知订阅者。

代码语言:javascript复制
public class OrderSubject<E> extends Subject<E> {
    private List<Observer<E>> observers = new ArrayList<>();
    @Override
    Result subscribe(Observer<E> observer) {
        if (this.observers.contains(observer))
            return Result.builder().code(200).msg("已经订阅过").build();
        this.observers.add(observer);
        return Result.builder().code(200).msg("订阅成功").build();
    }

    @Override
    Result unsubscribe(Observer<E> orderObserver) {
        this.observers.remove(orderObserver);
        return Result.builder().code(200).msg("已取消订阅").build();
    }

    @Override
    void notify(E event) {
        observers.forEach(observer -> {
            observer.handEvent(event);
        });
    }
}

4.定义抽象观察者Observer,里面有一个抽象方法handEvent(),它的作用是对消息进行处理。

代码语言:javascript复制
public abstract class Observer<E> {
    abstract void handEvent(E event);
}

5.定义具体观察者,有三个具体观察者,分别是短信观察者MessageObserver,订单观察者OrderObserverStockObserver库存观察者

代码语言:javascript复制
/**
 * 短信观察者
 * @param <E>
 */
public class MessageObserver<E> extends Observer<E> {
    @Override
    void handEvent(E event) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                sendMsg(event);
            }
        }).start();
    }

    private void sendMsg(E message){
        System.out.println("短信通知");
    }
}
代码语言:javascript复制
/**
 * 订单观察者
 * @param <E>
 */
public class OrderObserver<E> extends Observer<E> {
    @Override
    void handEvent(E event){
        new Thread(new Runnable() {
            @Override
            public void run() {
                saveOrder(event);
            }
        }).start();
    }

    void saveOrder(E message){
        System.out.println("保存订单");
    }
}
代码语言:javascript复制
/**
 * 库存观察者
 * @param <E>
 */
public class StockObserver<E> extends Observer<E> {
    @Override
    void handEvent(E event) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                deductInventory(event);
            }
        }).start();
    }

    private void deductInventory(E o){
        System.out.println("扣减库存");
    }
}

6.测试

代码语言:javascript复制
public class Client {
    public static void main(String[] args) {
        OrderSubject<Object> orderPublisher = new OrderSubject<>();

        StockObserver<Object> stockSubscriber = new StockObserver<>();
        OrderObserver<Object> orderSubscriber = new OrderObserver<>();
        MessageObserver<Object> messageObserver = new MessageObserver<>();

        Result order = orderPublisher.subscribe(orderSubscriber);
        Result stock = orderPublisher.subscribe(stockSubscriber);
        Result message = orderPublisher.subscribe(messageObserver);
        orderPublisher.notify(new OrderInfo("12345678","223344","order"));
    }
}

输出:
    保存订单
    扣减库存
    短信通知

上面的短信观察者MessageObserver,订单观察者OrderObserverStockObserver库存观察者三个是异步执行的,因为 使用了线程,如果不使用线程的话,他们是同步执行的,当然,本处为了演示异步直接使用new Thread(),但是在实际使用时这样可能不妥,因为这样做的话每个观察者并没有返回值,并不知道是否成功,可能有些异步操作需要将几个接口的返回值进行汇总或者判断,那么我们就可以使用Future或者CompletableFuture

Future

代码语言:javascript复制
public class StockObserver<E> extends Observer<E> {
    final static ExecutorService executorService = Executors.newCachedThreadPool();
    
    @Override
    Future handEvent(E event) {
        return executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return deductInventory(event);
            }
        });
    }

    private Boolean deductInventory(E o){
        return Boolean.TRUE;
    }
}

CompletableFuture

代码语言:javascript复制
public class StockObserver<E> extends Observer<E> {
    final static ExecutorService executorService = Executors.newCachedThreadPool();
    
    @Override
    CompletableFuture handEvent(E event) {
        return CompletableFuture.supplyAsync(() -> {
            return deductInventory(event);
        });
    }

    private Boolean deductInventory(E o){
        return Boolean.TRUE;
    }
}

如上便可以获取异步执行结果。

上面我们就使用观察者模式来模仿消息队列完成了一个下单过程,不过这只是一个代码层面的实现,在实际的分布式系统中,我们还是会使用对应的消息中间件。

观察者模式的优点

观察者和被观察者是耦合的,这样就降低了系统的耦合度。

观察者模式的缺点

如果观察者很多的话,那么每一个都去通知,可能效率就不高,同时也会增加系统的复杂度。

今天的分享就到这里,我们下期再见

0 人点赞