一个颇为巧妙的go并发模式:or pattern

2022-01-15 09:08:44 浏览数 (1)

golang由于对多并发的支持,它被广泛采用与后台开发。它特色的routine是一种高效的轻量级线程,采用go语言,再加上它所提供的各种支持高并发的语言机制,使得用它开发可扩展性高的后台服务器变得容易很多。

我使用go一段时间,在最近学习过程中发现一个颇为巧妙的并发模式,因此想总结一下。go的最大特色是goroutine和channel,不同routine之间能通过channel进行数据发送和通讯,假设我们发动了很多个routine用于请求某项服务,只要其中一个完成了,其他routine就可以直接结束,面对这样的需求情况该如何有效的处理呢。

如果routine的数量不多的情况下,例如我们发动了3个routine向远程服务器发出请求,只要有一个回应了,所有routine就可以结束,假设每个routine都返回一个channel,一旦成功请求到远程服务,它就把返回结果写入到它对应的channel中,示例如下:

代码语言:javascript复制
remoteServiceRequest := func(ip string, port int) <-chan string {
    result := make(chan string)
    //请求远程服务,并把请求结果写入到result
    ...
    return result
}

//启动3个并发请求
var serviceRes = make([] <-chan string, 0, 3)
for i := 0; i < 3; i   {
    serviceRes[i] = remoteServiceRequest(remote_ip, remote_port)
}

//等到3个请求,只要其中有一个返回即可
select {
case res0 := <-serviceRes[0]:
       do_something(res0)
 case res1 := <-serviceRes[1]:
     do_something(res1)
 case res2 := <-serviceRes[2]:
     do_something(res2)      
}

在go中,要等待若干个异步请求返回时,就可以使用select模式,问题在于如果请求的数量较少上面模式可行,当请求的数量非常多,例如有十几二十个,或是有上百个时,我们不可能使用几十个case分支,更麻烦在于很多时候我们并不能提前得知确切的异步请求数量,那么当我们希望在实现的目的为:在任意个异步请求发起的情况下,只要其中有一个完成,那么就获取结果,要灵活的实现这个目标该怎么办?这里就可以使用一个设计颇为巧妙的or 模式。

它的基本思路为,假设我们现在要发起一百个异步请求,显然我们不能在select下面分别罗列100个case,万一到时候需要的是150个怎办?但我们可以通过递归的方式实现case的自动化罗列,于是我可以在函数中先考虑只有3个case的情况,假设现在来了6个异步请求,那么我就可以通过函数先处理前3个请求,然后再递归的去处理接下来的3个请求,也就是通过递归的方式来实现任意个case的罗列,我们看看具体例子:

代码语言:javascript复制
package main 

import (
    "fmt"
    "time"
)

func main() {
    var or func(channels ...<-chan interface{}) <-chan interface{}
    /*
    or 函数将多个channel合并在一起,只要其中有一个channel有数据那么就可以返回
    */
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        //如果只有0个或1个channel,那么直接返回
        switch len(channels) {
        case 0:
            return nil 
        case 1:
            return channels[0]
        }

        /*
        如果有3个以上的channel,那么下面先针对前3个channel进行select case,然后递归的对
        余下的channel进行select case
        */
        orDone := make(chan interface{})
        go func() {
            defer close(orDone)
            switch len(channels) {
            case 2:
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                select {
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                //递归对3个之后的channel进行select case
                case <-or(append(channels[3:], orDone)...):
                }
            }
        }()
        return orDone 
    }

    sig := func(after time.Duration, done <-chan interface{}) <-chan interface{} {
        c := make(chan interface{})
        go func() {
            defer close(c)
            select {
            case <-done:
            case <-time.After(after):
            }
        }()
        return c 
    }

    start := time.Now()
    done := make(chan interface{})
    <-or(sig(2*time.Hour, done), sig(5*time.Minute, done), sig(1*time.Second, done), 
         sig(3 * time.Hour, done), sig(1 * time.Minute, done))
    close(done)
    fmt.Printf("done after %v", time.Since(start))
}

我们看看上面代码的基本逻辑,假设我们有6个异步请求返回的channel需要等待,那么把6个channel放入or函数后,前3个会在default下面的select被监控,然后代码创建一个orDone channel,然后将它和余下的3个channel再递归的传入or函数。假设第4个channel最先获得数据,那么递归调用的or函数就会在default部分的select case的阻塞中返回,于是它创建的orDone就会因为”defer close(orDone)”的执行而关闭。

于是在第一次递归时对应的语句”<-or(append(channel[3:], orDone)”就会返回,于是就会触发第一次调用or函数是所创建的orDone被关闭,于是最外层调用or函数时就能返回,如果不是后3个channel有数据,而是前3个channel有数据返回,那么递归前的orDone就会被关闭,这就导致递归调用or函数结束阻塞,于是算法就不用再等待后3个channel有数据返回,由此看来这个模式的设计还是非常巧妙的。

在示例代码中,我们启动了多个时钟,然后sig函数会等待时钟结束,然后把这些时钟对应的channel都放入or函数,里面最短的时钟是1秒,因此or函数会在等待1秒后直接返回,由此可见or模式在高并发场景下还是非常适用的。

0 人点赞