使用chan做subject
代码语言:javascript复制package main
import (
"fmt"
"time"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/iterable"
"github.com/reactivex/rxgo/observable"
)
func main() {
itChan := make(chan interface{})
defer close(itChan)
it, _ := iterable.New(itChan)
go func() {
<-observable.From(it).
Subscribe(handlers.NextFunc(func(v interface{}) {
if num, ok := v.(int); ok {
fmt.Println(num)
}
}))
}()
itChan <- 1
time.Sleep(1 * time.Second)
}
扩充原来的operators
包装包装即可,实现了简单的sum, flatmap :
代码语言:javascript复制package main
import (
"fmt"
"github.com/reactivex/rxgo/fx"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/iterable"
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
)
type MyObservable struct {
observable.Observable
}
func (o MyObservable) Sum(apply fx.MappableFunc) MyObservable {
out := make(chan interface{})
go func() {
sum := 0.0
for item := range o.Observable {
retItem := apply(item)
switch v := retItem.(type) {
case int:
sum = float64(v)
case float64:
sum = float64(v)
}
}
out <- sum
close(out)
}()
return MyObservable{observable.Observable(out)}
}
type FlatMappableFunc func(interface{}) observable.Observable
// FlatMap 参考https://github.com/ReactiveX/RxGo/issues/49
func (o MyObservable) FlatMap(apply FlatMappableFunc) MyObservable {
out := make(chan interface{})
go func() {
for item := range o.Observable {
go func(sub observable.Observable) {
handler := observer.Observer{
NextHandler: func(i interface{}) {
out <- i
},
ErrHandler: func(err error) {
out <- err
},
}
s := sub.Subscribe(handler)
<-s
}(apply(item))
}
close(out)
}()
return MyObservable{observable.Observable(out)}
}
func ToMyObv(obv interface{}) MyObservable {
switch v := obv.(type) {
case observable.Observable:
return MyObservable{v}
case iterable.Iterable:
return MyObservable{observable.From(v)}
}
return MyObservable{}
}
func main() {
it, _ := iterable.New([]interface{}{1, 2, 3, 4, 5})
<-ToMyObv(it).
Sum(func(v interface{}) interface{} {
return v
}).
Map(func(v interface{}) interface{} {
return v.(float64) 1
}).
Subscribe(handlers.NextFunc(func(v interface{}) {
fmt.Println(v)
}))
}