四、观察者模式

2022-09-21 09:58:50 浏览数 (2)

观察者模式

定义

在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

实现方式

可分为阻塞、非阻塞,根据业务场景决定使用哪种。

基础版本

IObserver 观察者接口
代码语言:javascript复制
public interface IObserver {

    // 模板模式
    default void updateWraper(Message message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        update(message);
    }

    void update(Message message);
}
ConcreteObserverOne 观察者实现类
代码语言:javascript复制
public class ConcreteObserverOne implements IObserver {
    @Override
    public void update(Message message) {
        System.out.println("观察者One的观察方法被执行,收到消息:"   message.getContent());
    }
}
ConcreteObserverTwo 观察者实现类
代码语言:javascript复制
public class ConcreteObserverTwo implements IObserver {
    @Override
    public void update(Message message) {
        System.out.println("观察者Two的观察方法被执行,收到消息:"   message.getContent());
    }
}
ISubject 发布者接口
代码语言:javascript复制
public interface ISubject {

    /**
     * 注册观察者
     */
    ISubject registerObserver(IObserver observer);

    /**
     * 移除观察者
     */
    ISubject removeObserver(IObserver observer);

    /**
     * 提醒观察者
     */
    void notifyObservers(Message message);
}
ConcreteSubject 发布者实现类-阻塞
代码语言:javascript复制
public class ConcreteSubject implements ISubject {

    // 可以利用spring的DI,就不用注册了
    private List<IObserver> observers = Lists.newArrayList();

    @Override
    public ISubject registerObserver(IObserver observer) {
        observers.add(observer);
        return this;
    }

    @Override
    public ISubject removeObserver(IObserver observer) {
        observers.remove(observer);
        return this;
    }

    @Override
    public void notifyObservers(Message message) {
        observers.forEach(observer -> observer.updateWraper(message));
    }
}
AsyncSubject 发布者实现类-非阻塞
代码语言:javascript复制
public class AsyncSubject implements ISubject {

    public static final int DEFAULT_OBSERVER_THREAD_POLL_SIZE = 5;
    private Executor threadPool;
    // 可以利用spring的DI,就不用注册了
    private List<IObserver> observers;

    public AsyncSubject() {
        observers = Lists.newArrayList();
        threadPool = Executors.newFixedThreadPool(DEFAULT_OBSERVER_THREAD_POLL_SIZE);
    }

    @Override
    public ISubject registerObserver(IObserver observer) {
        observers.add(observer);
        return this;
    }

    @Override
    public ISubject removeObserver(IObserver observer) {
        observers.remove(observer);
        return this;
    }

    @Override
    public void notifyObservers(Message message) {
        observers.forEach(observers -> threadPool.execute(() -> observers.updateWraper(message)));
    }
}
Message 发布者发布的消息
代码语言:javascript复制
@Data
@AllArgsConstructor
public class Message {
    private String content;
}
Application 应用类
代码语言:javascript复制
public class Application {
    public static void main(String[] args) {
        final ISubject subject = new ConcreteSubject();
        subject.registerObserver(new ConcreteObserverOne())
               .registerObserver(new ConcreteObserverTwo());
        subject.notifyObservers(new Message("【同步】传播的消息"));

        System.out.println("—————————分割线—————————");

        final ISubject asyncSubject = new AsyncSubject();
        asyncSubject.registerObserver(new ConcreteObserverOne())
                    .registerObserver(new ConcreteObserverTwo());
        asyncSubject.notifyObservers(new Message("【异步】传播的消息"));
    }
}
运行结果
代码语言:javascript复制
Thread id【1】- 观察者One的观察方法被执行,收到消息:【同步】传播的消息
Thread id【1】- 观察者Two的观察方法被执行,收到消息:【同步】传播的消息
—————————分割线—————————
Thread id【11】- 观察者One的观察方法被执行,收到消息:【异步】传播的消息
Thread id【12】- 观察者Two的观察方法被执行,收到消息:【异步】传播的消息

Eventbus

google封装好的工具类,使用起来非常方便。而且功能强大,可根据消息类型匹配对应的观察者。

ConcreteObserverOne 观察者One
代码语言:javascript复制
public class ConcreteObserverOne {

    @Subscribe
    public void update(String message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        System.out.println("观察者One的观察方法被执行,参数类型为String。收到消息:"   message);
    }
}
ConcreteObserverTwo 观察者Two
代码语言:javascript复制
public class ConcreteObserverTwo {

    @Subscribe
    public void update(Message message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        System.out.println("观察者Two的观察方法被执行,参数类型为Message。收到消息:"   message.getContent());
    }
}
Application 应用类
代码语言:javascript复制
public class Application {

    private static final int DEFAULT_EVENTBUS_THREAD_POLL_SIZE = 10;

    public static void main(String[] args) {

        // 同步
        final EventBus eventBus = new EventBus();
        eventBus.register(new ConcreteObserverOne());
        eventBus.register(new ConcreteObserverTwo());
        eventBus.post("这是一条同步的消息,类型为String");
        eventBus.post(new Message("这是一条同步的消息,类型为Message"));

        // 异步
        final AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POLL_SIZE));
        asyncEventBus.register(new ConcreteObserverOne());
        asyncEventBus.register(new ConcreteObserverTwo());
        asyncEventBus.post("这是一条异步的消息,类型为String");
        asyncEventBus.post(new Message("这是一条异步的消息,类型为Message"));
    }
}
运行结果
代码语言:javascript复制
Thread id【1】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条同步的消息,类型为String
Thread id【1】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条同步的消息,类型为Message
Thread id【12】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条异步的消息,类型为String
Thread id【13】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条异步的消息,类型为Message

优点

  • 将观察者与被观察者解耦,使其达到低耦合的目的,提升扩展性。
  • 开闭原则。 无需修改发布者代码就能引入新的订阅者类
  • 可以在运行时建立对象之间的联系

与其他模式的关系

责任链模式

从实现上来看,两者都是由调用者发出消息,多个节点处理。但侧重点不同,责任链模式侧重对消息的处理,反馈给调用者。而观察者侧重的是收到消息后,执行的一些逻辑。从使用方式来看,如果将观察者的异步处理运用到责任链上,你觉得合适吗?

0 人点赞