rxgo笔记

2019-11-22 09:33:05 浏览数 (2)

使用chan做subject

代码语言:javascript复制
package main

import (
    "fmt"
    "time"

    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
)

func main() {
    itChan := make(chan interface{})
    defer close(itChan)

    it, _ := iterable.New(itChan)

    go func() {
        <-observable.From(it).
            Subscribe(handlers.NextFunc(func(v interface{}) {
                if num, ok := v.(int); ok {
                    fmt.Println(num)
                }
            }))
    }()

    itChan <- 1
    time.Sleep(1 * time.Second)
}

扩充原来的operators

包装包装即可,实现了简单的sum, flatmap :

代码语言:javascript复制
package main

import (
    "fmt"

    "github.com/reactivex/rxgo/fx"
    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
    "github.com/reactivex/rxgo/observer"
)

type MyObservable struct {
    observable.Observable
}

func (o MyObservable) Sum(apply fx.MappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        sum := 0.0
        for item := range o.Observable {
            retItem := apply(item)
            switch v := retItem.(type) {
            case int:
                sum  = float64(v)
            case float64:
                sum  = float64(v)
            }
        }
        out <- sum
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

type FlatMappableFunc func(interface{}) observable.Observable

// FlatMap 参考https://github.com/ReactiveX/RxGo/issues/49
func (o MyObservable) FlatMap(apply FlatMappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        for item := range o.Observable {
            go func(sub observable.Observable) {
                handler := observer.Observer{
                    NextHandler: func(i interface{}) {
                        out <- i
                    },
                    ErrHandler: func(err error) {
                        out <- err
                    },
                }
                s := sub.Subscribe(handler)
                <-s
            }(apply(item))
        }
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

func ToMyObv(obv interface{}) MyObservable {
    switch v := obv.(type) {
    case observable.Observable:
        return MyObservable{v}
    case iterable.Iterable:
        return MyObservable{observable.From(v)}
    }
    return MyObservable{}
}

func main() {

    it, _ := iterable.New([]interface{}{1, 2, 3, 4, 5})
    <-ToMyObv(it).
        Sum(func(v interface{}) interface{} {
            return v
        }).
        Map(func(v interface{}) interface{} {
            return v.(float64)   1
        }).
        Subscribe(handlers.NextFunc(func(v interface{}) {
            fmt.Println(v)
        }))
}

0 人点赞