kotlin--使用观察者、装饰模式实现响应式风格架构

2021-12-16 10:12:19 浏览数 (2)

RxJava就是一种响应式编程框架,利用观察者、装饰模式来实现上下流消息通信和函数式编程,解决了上下流解耦、回调地狱等,其中的思想很值得我们学习,今天来模拟实现RxJava的架构
一、观察者与被观察者

观察者与被观察者的思想就是观察者持有被观察者,当观察者发生变化时,回调被观察者的函数

1.创建观察者接口
代码语言:javascript复制
/**
 * 观察者
 */
interface Observer<T> {
    /**
     * 被观察者发送消息
     */
    fun onNext(obj: T)

    /**
     * 订阅成功
     */
    fun onSubscribe()

    fun onError(throwable: Throwable)

    fun onComplete()
}
2.创建被观察者接口

提供一个订阅方法,让观察者订阅

代码语言:javascript复制
/**
 * 被观察者
 */
interface Observable<T> {
    /**
     * 观察者订阅
     */
    fun subscribe(observer: Observer<T>)
}

为了方便拓展,再封装一层

代码语言:javascript复制
abstract class ObservableProxy<T> : Observable<T> {
    override fun subscribe(observer: Observer<T>) {
        subscribeProxy(observer)
    }

    protected abstract fun subscribeProxy(observer: Observer<T>)
}
3.实现被观察者接口
代码语言:javascript复制
/**
 * 实现生成一个被观察者
 */
class ObservableCreate<T>(private val observable: Observable<T>) : ObservableProxy<T>() {

    override fun subscribeProxy(observer: Observer<T>) {
        // 观察者与被观察者建立关系
        observable.subscribe(observer)
        // 回调下订阅成功函数
        observer.onSubscribe()
    }
}

到这里我们基本实现了观察者模式,使用方法如下:

代码语言:javascript复制
public class Test {
    public static void main(String[] args) {
        new ObservableCreate<String>(new ObservableProxy<String>() {
            @Override
            protected void subscribeProxy(Observer<String> observer) {
                observer.onNext("hello");
                observer.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onNext(String obj) {
                System.out.println(obj);
            }

            @Override
            public void onSubscribe() {
                System.out.println("onSubscribe");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    }
}

目前的类图如下:

二、封装被观察者

上面代码在创建订阅关系时,直接创建了一个被观察者类的匿名实现,我们可以继续对观察者进行封装,不对外暴露被观察者

1.定义发射器接口

内部使用发射器,来替换直接调用观察者的方法,发射器拥有和观察者相同的一部分方法

代码语言:javascript复制
/**
 * 发射器,用于给观察者发送消息
 */
interface Emitter<T> {
    /**
     * 被观察者发送消息
     */
    fun onNext(obj: T)

    fun onError(throwable: Throwable)

    fun onComplete()
}
2.定义发射器工厂

传入观察者,返回一个发射器

代码语言:javascript复制
/**
 * 发射器工厂类
 */
object EmitterFactory {
    fun <T> create(observer: Observer<T>): Emitter<T> {
        return object : Emitter<T> {
            override fun onNext(obj: T) {
                observer.onNext(obj)
            }

            override fun onError(throwable: Throwable) {
                observer.onError(throwable)
            }

            override fun onComplete() {
                observer.onComplete()
            }
        }
    }
}
3.暴露一个接口给外部提供发射器
代码语言:javascript复制
interface ObservableOnSubscribe<T> {
    fun subscribe(emitter: Emitter<T>?)
}
4.修改生成被观察者类
代码语言:javascript复制
/**
 * 实现生成一个被观察者
 */
class ObservableCreate<T>(private val observableOnSubscribe: ObservableOnSubscribe<T>) : ObservableProxy<T>() {

    override fun subscribeProxy(observer: Observer<T>) {
        // 观察者与被观察者建立关系,外部通过发射器向被观察者发送消息
        observableOnSubscribe.subscribe(EmitterFactory.create(observer))
        // 回调下订阅成功函数
        observer.onSubscribe()
    }
}

外部调用的代码修改为:

代码语言:javascript复制
public class Test {
    public static void main(String[] args) {
        new ObservableCreate<String>(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(Emitter<String> emitter) {
                emitter.onNext("hello");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onNext(String obj) {
                System.out.println(obj);
            }

            @Override
            public void onSubscribe() {
                System.out.println("onSubscribe");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    }
}

目前类图如下:

三、装饰器模式

目前上流被观察者发送的数据和下流观察者接收的数据是相同类型的,在实际开发中,很可能会对该数据类型进行转换,我们不希望在观察者中进行转换,因为这样的代码结构显得不优雅,而且其转换过程可能是异步的,那么如何在上流就进行转换?

1.定义数据转换接口
代码语言:javascript复制
interface DataMapper<from, to> {
    fun change(from: from): to
}
2.定义装饰类

有两个成员变量

  • 原有的被观察者
  • 数据转换接口实现类

实例化一个观察者,对原来的被观察者进行订阅,并在该观察者方法中使用数据转换函数后,调用外部传入的观察者的方法

代码语言:javascript复制
/**
 * 转换后新的被观察者,就是将原来的被观察者装饰了下
 */
class DecorateObservable<from, to>(
    val observable: Observable<from>,//转换前的被观察者
    private val observableMapper: DataMapper<from, to>//转换函数
) : ObservableProxy<to>() {//继承至被观察者抽象类

    override fun subscribeProxy(observer: Observer<to>) {
        //实例化一个发送from数据的观察者
        observable.subscribe(object : Observer<from> {
            override fun onNext(obj: from) {
                // 数据转换后,通过发送to数据的观察者发送
                observer.onNext(observableMapper.change(obj))
            }

            override fun onSubscribe() {
                observer.onSubscribe()
            }

            override fun onError(throwable: Throwable) {
                observer.onError(throwable)
            }

            override fun onComplete() {
                observer.onComplete()
            }
        })
    }
}
3.为被观察者抽象类定义一个转换函数
代码语言:javascript复制
abstract class ObservableProxy<T> : Observable<T> {
    override fun subscribe(observer: Observer<T>) {
        subscribeProxy(observer)
    }

    protected abstract fun subscribeProxy(observer: Observer<T>)

    fun <to> map(dataMapper: DataMapper<T, to>): DecorateObservable<T, to> {
        // 装饰当前对象,返回一个新的被观察者
        return DecorateObservable(this, dataMapper)
    }
}

外部使用代码如下:

代码语言:javascript复制
public class Test {
    public static void main(String[] args) {
        new ObservableCreate<String>(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(Emitter<String> emitter) {
                emitter.onNext("hello");
            }
        }).map(new DataMapper<String, Integer>() {

            @Override
            public Integer change(String s) {
                return 123;
            }
        }).subscribe(new Observer<Integer>() {//对新的被观察者进行订阅
            @Override
            public void onNext(Integer obj) {
                System.out.println(obj);
            }

            @Override
            public void onSubscribe() {
                System.out.println("onSubscribe");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    }
}

目前类图如下:

四、线程调度

目前发送数据和接收数据处于同一个线程中,如果想要使得上流发送数据在子线程,只需要包装被观察者的订阅方法,订阅方法目前在下面的地方调用:

它们都继承至ObservableProxy,所以只需要在ObservableProxy中定义一个线程调度的方法,并在这两个地方调用即可

1.上下流线程调度

由于Java中没有Looper,所以如果不指定下流使用子线程,那么上下流将会在同一线程中执行,在抽象类中,定义两个变量,来表示上流和下流是否使用线程 这边偷个懒,最好还是把线程调度方法单独抽象出来

代码语言:javascript复制
abstract class ObservableProxy<T> : Observable<T> {
    //上流是否使用线程
    private var subUseDispatcher: Boolean = false

    //下流是否使用线程
    private var observerUseDispatcher: Boolean = false

    override fun subscribe(observer: Observer<T>) {
        subscribeProxy(observer)
    }

    protected abstract fun subscribeProxy(observer: Observer<T>)

    /**
     * 返回一个新的to类型被观察者
     */
    fun <to> map(dataMapper: DataMapper<T, to>): ObservableProxy<to> {
        // 装饰当前对象,返回一个新的被观察者
        return DecorateObservable(this, dataMapper)
    }

    /**
     * 配置当前被观察者是否使用子线程
     */
    fun subscribeOnThread(useDispatcher: Boolean): ObservableProxy<T> {
        this.subUseDispatcher = useDispatcher
        //往上设置被观察者
        if (this is DecorateObservable<*, *>) {
            (this as DecorateObservable<*, *>).observable.subscribeOnThread(useDispatcher)
        }
        return this
    }


    /**
     * 配置观察者接收是否使用其他线程
     */
    fun observeOnThread(observerUseDispatcher: Boolean): ObservableProxy<T> {
        this.observerUseDispatcher = observerUseDispatcher
        //往上设置被观察者
        if (this is DecorateObservable<*, *>) {
            (this as DecorateObservable<*, *>).observable.observeOnThread(observerUseDispatcher)
        }
        return this
    }

    /**
     * 上流进行线程调度
     */
    fun dispatchSubscribe(command: Runnable) {
        if (subUseDispatcher) {//需要线程执行
            ThreadPoolFactory.getNewThreadPool().execute(command)
        } else {
            command.run()
        }
    }

    /**
     * 下流进行线程调度
     */
    fun dispatchObserve(command: Runnable) {
        if (observerUseDispatcher) {//需要线程执行
            ThreadPoolFactory.getDefaultPool().execute(command)
        } else {
            command.run()
        }
    }
}
2.定义获取线程池的工厂类

这边新建两个线程池,来给分别上下流使用

代码语言:javascript复制
class ThreadPoolFactory {
    companion object {
        // 下流使用的线程
        private val default: ExecutorService by lazy { Executors.newSingleThreadExecutor() }
        // 上流使用的线程
        private val newThread: ExecutorService by lazy { Executors.newCachedThreadPool() }

        fun getDefaultPool(): ExecutorService {
            return default
        }

        fun getNewThreadPool(): ExecutorService {
            return newThread
        }
    }
}
3.在最初的生成被观察者类中使用线程调度

对应第一张图片的代码处,使用线程调度执行订阅方法,

代码语言:javascript复制
/**
 * 实现生成一个被观察者
 */
class ObservableCreate<T>(private val observableOnSubscribe: ObservableOnSubscribe<T>) : ObservableProxy<T>() {

    override fun subscribeProxy(observer: Observer<T>) {
        // 观察者与被观察者建立关系,外部通过发射器向被观察者发送消息
        dispatchSubscribe {//上流
            observableOnSubscribe.subscribe(EmitterFactory.create(observer, this))
        }
        // 回调下订阅成功函数
        dispatchObserve {//下流
            observer.onSubscribe()
        }
    }
}
4.对调用观察者发送消息的地方加上线程调度

一个是创建发射器时:

代码语言:javascript复制
object EmitterFactory {
    fun <T> create(observer: Observer<T>, observable: ObservableProxy<T>): Emitter<T> {
        return object : Emitter<T> {
            override fun onNext(obj: T) {
                observable.dispatchObserve {
                    observer.onNext(obj)
                }
            }

            override fun onError(throwable: Throwable) {
                observable.dispatchObserve {
                    observer.onError(throwable)
                }
            }

            override fun onComplete() {
                observable.dispatchObserve {
                    observer.onComplete()
                }
            }
        }
    }
}

一个是数据转换时:

代码语言:javascript复制
class DecorateObservable<from, to>(
    observable: ObservableProxy<from>,//转换前的被观察者
    private val observableMapper: DataMapper<from, to>//转换函数
) : AbstractDecorateObservable<from, to>(observable) {//继承至观察者装饰抽象类

    override fun subscribeProxy(observer: Observer<to>) {
        //实例化一个发送from数据的观察者
        observable.subscribe(object : Observer<from> {
            override fun onNext(obj: from) {
                // 数据转换后,通过发送to数据的观察者发送
                val change = observableMapper.change(obj)
                dispatchObserve {
                    observer.onNext(change)
                }
            }

            override fun onSubscribe() {
                dispatchObserve {
                    observer.onSubscribe()
                }
            }

            override fun onError(throwable: Throwable) {
                dispatchObserve {
                    observer.onError(throwable)
                }
            }

            override fun onComplete() {
                dispatchObserve {
                    observer.onComplete()
                }
            }
        })
    }
}

修改完后测试下代码:

代码语言:javascript复制
public class Test {
    public static void main(String[] args) {
        new ObservableCreate<String>(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(Emitter<String> emitter) {
                System.out.println("subscribe thread:"   Thread.currentThread());
                emitter.onNext("hello");
            }
        }).map(new DataMapper<String, Integer>() {

            @Override
            public Integer change(String s) {
                System.out.println("change thread:"   Thread.currentThread());
                return 123;
            }
        }).subscribeOnThread(true).observeOnThread(true).subscribe(new Observer<Integer>() {//对新的被观察者进行订阅
            @Override
            public void onNext(Integer obj) {
                System.out.println("onNext thread:"   Thread.currentThread());
                System.out.println(obj);
            }

            @Override
            public void onSubscribe() {
                System.out.println("onSubscribe");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

        //不使用转换
//        new ObservableCreate<String>(new ObservableOnSubscribe<String>() {
//            @Override
//            public void subscribe(Emitter<String> emitter) {
//                System.out.println("subscribe thread:"   Thread.currentThread());
//                emitter.onNext("hello");
//            }
//        }).subscribeOnThread(true).observeOnThread(true).subscribe(new Observer<String>() {
//            @Override
//            public void onNext(String obj) {
//                System.out.println("onNext thread:"   Thread.currentThread());
//                System.out.println(obj);
//            }
//
//            @Override
//            public void onSubscribe() {
//
//            }
//
//            @Override
//            public void onError(Throwable throwable) {
//
//            }
//
//            @Override
//            public void onComplete() {
//
//            }
//        });
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

结果: subscribe thread:Thread[pool-1-thread-1,5,main] change thread:Thread[pool-2-thread-1,5,main] onSubscribe onNext thread:Thread[pool-2-thread-1,5,main] 123

项目地址:https://gitee.com/aruba/rx-java-application.git

0 人点赞