观察者模式
定义
在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。
实现方式
可分为阻塞、非阻塞,根据业务场景决定使用哪种。
基础版本
IObserver 观察者接口
代码语言:javascript复制public interface IObserver {
// 模板模式
default void updateWraper(Message message) {
System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
update(message);
}
void update(Message message);
}
ConcreteObserverOne 观察者实现类
代码语言:javascript复制public class ConcreteObserverOne implements IObserver {
@Override
public void update(Message message) {
System.out.println("观察者One的观察方法被执行,收到消息:" message.getContent());
}
}
ConcreteObserverTwo 观察者实现类
代码语言:javascript复制public class ConcreteObserverTwo implements IObserver {
@Override
public void update(Message message) {
System.out.println("观察者Two的观察方法被执行,收到消息:" message.getContent());
}
}
ISubject 发布者接口
代码语言:javascript复制public interface ISubject {
/**
* 注册观察者
*/
ISubject registerObserver(IObserver observer);
/**
* 移除观察者
*/
ISubject removeObserver(IObserver observer);
/**
* 提醒观察者
*/
void notifyObservers(Message message);
}
ConcreteSubject 发布者实现类-阻塞
代码语言:javascript复制public class ConcreteSubject implements ISubject {
// 可以利用spring的DI,就不用注册了
private List<IObserver> observers = Lists.newArrayList();
@Override
public ISubject registerObserver(IObserver observer) {
observers.add(observer);
return this;
}
@Override
public ISubject removeObserver(IObserver observer) {
observers.remove(observer);
return this;
}
@Override
public void notifyObservers(Message message) {
observers.forEach(observer -> observer.updateWraper(message));
}
}
AsyncSubject 发布者实现类-非阻塞
代码语言:javascript复制public class AsyncSubject implements ISubject {
public static final int DEFAULT_OBSERVER_THREAD_POLL_SIZE = 5;
private Executor threadPool;
// 可以利用spring的DI,就不用注册了
private List<IObserver> observers;
public AsyncSubject() {
observers = Lists.newArrayList();
threadPool = Executors.newFixedThreadPool(DEFAULT_OBSERVER_THREAD_POLL_SIZE);
}
@Override
public ISubject registerObserver(IObserver observer) {
observers.add(observer);
return this;
}
@Override
public ISubject removeObserver(IObserver observer) {
observers.remove(observer);
return this;
}
@Override
public void notifyObservers(Message message) {
observers.forEach(observers -> threadPool.execute(() -> observers.updateWraper(message)));
}
}
Message 发布者发布的消息
代码语言:javascript复制@Data
@AllArgsConstructor
public class Message {
private String content;
}
Application 应用类
代码语言:javascript复制public class Application {
public static void main(String[] args) {
final ISubject subject = new ConcreteSubject();
subject.registerObserver(new ConcreteObserverOne())
.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers(new Message("【同步】传播的消息"));
System.out.println("—————————分割线—————————");
final ISubject asyncSubject = new AsyncSubject();
asyncSubject.registerObserver(new ConcreteObserverOne())
.registerObserver(new ConcreteObserverTwo());
asyncSubject.notifyObservers(new Message("【异步】传播的消息"));
}
}
运行结果
代码语言:javascript复制Thread id【1】- 观察者One的观察方法被执行,收到消息:【同步】传播的消息
Thread id【1】- 观察者Two的观察方法被执行,收到消息:【同步】传播的消息
—————————分割线—————————
Thread id【11】- 观察者One的观察方法被执行,收到消息:【异步】传播的消息
Thread id【12】- 观察者Two的观察方法被执行,收到消息:【异步】传播的消息
Eventbus
google封装好的工具类,使用起来非常方便。而且功能强大,可根据消息类型匹配对应的观察者。
ConcreteObserverOne 观察者One
代码语言:javascript复制public class ConcreteObserverOne {
@Subscribe
public void update(String message) {
System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
System.out.println("观察者One的观察方法被执行,参数类型为String。收到消息:" message);
}
}
ConcreteObserverTwo 观察者Two
代码语言:javascript复制public class ConcreteObserverTwo {
@Subscribe
public void update(Message message) {
System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
System.out.println("观察者Two的观察方法被执行,参数类型为Message。收到消息:" message.getContent());
}
}
Application 应用类
代码语言:javascript复制public class Application {
private static final int DEFAULT_EVENTBUS_THREAD_POLL_SIZE = 10;
public static void main(String[] args) {
// 同步
final EventBus eventBus = new EventBus();
eventBus.register(new ConcreteObserverOne());
eventBus.register(new ConcreteObserverTwo());
eventBus.post("这是一条同步的消息,类型为String");
eventBus.post(new Message("这是一条同步的消息,类型为Message"));
// 异步
final AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POLL_SIZE));
asyncEventBus.register(new ConcreteObserverOne());
asyncEventBus.register(new ConcreteObserverTwo());
asyncEventBus.post("这是一条异步的消息,类型为String");
asyncEventBus.post(new Message("这是一条异步的消息,类型为Message"));
}
}
运行结果
代码语言:javascript复制Thread id【1】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条同步的消息,类型为String
Thread id【1】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条同步的消息,类型为Message
Thread id【12】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条异步的消息,类型为String
Thread id【13】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条异步的消息,类型为Message
优点
- 将观察者与被观察者解耦,使其达到低耦合的目的,提升扩展性。
- 开闭原则。 无需修改发布者代码就能引入新的订阅者类
- 可以在运行时建立对象之间的联系
与其他模式的关系
责任链模式
从实现上来看,两者都是由调用者发出消息,多个节点处理。但侧重点不同,责任链模式侧重对消息的处理,反馈给调用者。而观察者侧重的是收到消息后,执行的一些逻辑。从使用方式来看,如果将观察者的异步处理运用到责任链上,你觉得合适吗?