GO 语言的并发模式你了解多少?

2023-10-24 19:30:48 浏览数 (2)

实际上,出现上述的情况,还是因为我们对于 GO 语言的并发模型和涉及的 GO 语言基础不够扎实,误解了语言的用法。

那么,对于 GO 语言的并发模式,我们一起来梳理一波。 GO 语言常见的并发模式有这些:

  1. 创建模式
  2. 退出模式
  3. 管道模式
  4. 超时模式和取消模式

在 GO 语言里面,咱们使用使用并发,自然离不开使用 GO 语言的协程 goroutine,通道 channel 和 多路复用 select,接下来就来看看各种模式都是如何去搭配使用这三个关键原语的

创建模式

使用过通道和协程的朋友对于创建模式肯定不会模式,这是一个非常常用的方式,也是一个非常简单的使用方式:

  1. 主协程中调用 help 函数,返回一个通道 ch 变量
  2. 通道 ch 用于主协程和 子协程之间的通信,其中通道的数据类型完全可以自行定义
代码语言:javascript复制
type XXX struct{...}

func help(fn func()) chan XXX {
    ch := make(chan XXX)
    // 开启一个协程
    go func(){
        // 此处的协程可以控制和外部的 主协程 通过 ch 来进行通信,达到一定逻辑便可以执行自己的 fn 函数
        fn()
        ch <- XXX
    }()
}

func main(){
    ch := help(func(){
        fmt.Println("这是GO 语言 并发模式之 创建模式")
    })
    
    <- ch
}

退出模式

程序的退出我们应该也不会陌生,对于一些常驻的服务,如果是要退出程序,自然是不能直接就断掉,此时会有一些连接和业务并没有关闭,直接关闭程序会导致业务异常,例如在关闭过程中最后一个 http 请求没有正常响应等等等

此时,就需要做优雅关闭了,对于协程 goroutine 退出有 3 种模式

  1. 分离模式
  2. join 模式
  3. notify-and-wait 模式

分离模式

此处的分离模式,分离这个术语实际上是线程中的术语,pthread detached

分离模式可以理解为,咱们创建的协程 goroutine,直接分离,创建子协程的父协程不用关心子协程是如何退出的,子协程的生命周期主要与它执行的主函数有关,咱们 return 之后,子协程也就结束了

对于这类分离模式的协程,咱们需要关注两类,一种是一次性的任务,咱们 go 出来后,执行简单任务完毕后直接退出,一种是常驻程序,需要优雅退出,处理一些垃圾回收的事情

例如这样:

  1. 主程序中设置一个通道变量 ch ,类型为 os.Signal
  2. 然后主程序就开始各种创建协程执行自己的各种业务
  3. 直到程序收到了 syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT 任意一个信号的时候,则会开始进行垃圾回收等清理工作,执行完毕后,程序再进行退出
代码语言:javascript复制
func main(){
     ch := make(chan os.Signal)
     signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
     
     // ...
     // go 程序执行其他业务
     // ...
     
    
    for i := range ch {
        switch i {
        case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
            // 做一些清理工作
            os.Exit(0)
        }
    }
}

join 模式

看到这个关键字,是不是也似曾相识,和线程貌似很像,例如 线程中 父线程可以通过 pthread_join 来等待子线程结束,并且还可以获取子线程的结束状态

GO 语言中等待子协程退出并且获取子协程的退出状态,咱们就可以使用通道 channel 的方式来进行处理

例子1

等待一个子协程退出,并获取退出状态

  1. 主协程中调用 help 方法得到一个 ch 通道变量,主协程阻塞着读 ch
  2. help 中开辟一个子协程去执行传入的 fn 回调函数,并传参为 ok bool
  3. 实际 fn 函数判断传参 ok 是否是 true,若不是则返回具体的错误信息,若是 true 则返回 nil
代码语言:javascript复制
func help(f func(bool) error, ok bool) <-chan error {
 ch := make(chan error)
 go func() {
  ch <- f(ok)
 }()

 return ch
}

func fn(ok bool) error {
 if !ok {
  return errors.New("not ok ... ")
 }

 return nil
}

func main() {
 ch := help(fn, true)
 fmt.Println("help 111")
 err := <-ch
 fmt.Println("help 111 done ", err)

 ch = help(fn, false)
 fmt.Println("help 222")
 err = <-ch
 fmt.Println("help 222 done ", err)
}

看上如上程序,我们就可以知道,第一次调用 help(fn , true) ,主协程等待子协程退出的时候,会得到一个错误信息,为 not ok ... , 第二次调用 help(fn , false) 的时候,返回的 err 是一个 nil

通过上述这种方式,主协程不仅可以轻易的等待一个子协程退出,还可以获取到子协程退出的状态

那么,主协程如果是等待多个协程退出呢?需要如何处理?

例子2

主协程等待多个协程退出咱们就需要使用到 GO 中的 sync.WaitGroup

  1. 使用 help 函数,传入回调函数,参数1 bool,参数2 int ,其中参数 2 表示开辟子协程的个数,返回值为一个无缓冲的 channel 变量,数据类型是 struct{}
  2. 使用 var wg sync.WaitGroup ,开辟子协程的时候记录一次 wg.Add(1),当子协程退出时 ,记录退出 wg.Done()
  3. help 中再另起一个协程 wg.Wait() 等待所有子协程退出,并将 ch 变量写入值
  4. 主协程阻塞读取 ch 变量的值,待所有子协程都退出之后,help 中写入到 ch 中的数据,主协程就能马上收到 ch 中的数据,并退出程序
代码语言:javascript复制
func help(f func(bool)error, ok bool, num int)chan struct{}{
    ch := make(chan struct{})
    
    var wg sync.WaitGroup
    for i:=0; i<num; i   {
        wg.Add(1)
        
        go func(){
            f(ok)
            fmt.Println(" f done ")
            wg.Done()
        }() 
    }
    
    go func(){
        // 等待所有子协程退出
        wg.Wait()
        ch <- struct{}{}
    }()
    
    
    return ch
}

func fn(ok bool) error{

    time.Sleep(time.Second * 1)

    if !ok{
        return errors.New("not ok ... ")
    }
    
    return nil
}


func main(){
    ch := help(fn , true)
    fmt.Println("help 111")
     <- ch 
    fmt.Println("help 111 done ",err)
    
}

notify-and-wait 模式

可以看到上述模式,都是主协程等待一个子协程,或者多个子协程结束后,主协程再进行退出,或者处理完垃圾回收后退出

那么如果主协程要主动通知子协程退出,我们应该要如何处理呢?

同样的问题,如果主协程自己退出了,而没有通知其他子协程退出,这是会导致业务数据异常或者丢失的,那么此刻我们就可以使用到 notify-and-wait 模式 来进行处理

我们就直接来写一个主协程通知并等待多个子协程退出的 demo:

  1. 主协程调用 help 函数,得到一个 quit chan struct{} 类型的通道变量,主协程阻塞读取 quit 的值
  2. help 函数根据传入的参数 num 来创建 num 个子协程,并且使用 sync.WaitGroup 来控制
  3. 当主协程在 quit 通道中写入数据时,主动通知所有子协程退出
  4. help 中的另外一个协程读取到 quit 通道中的数据,便 close 掉 j 通道,触发所有的子协程读取 j 通道值的时候,得到的 ok 为 false,进而所有子协程退出
  5. wg.Wait() 等待所有子协程退出后,再在 quit 中写入数据
  6. 主协程此时从 quit 中读取到数据,则知道所有子协程全部退出,自己的主协程即刻退出
代码语言:javascript复制
func fn(){
   // 模拟在处理业务
   time.Sleep(time.Second * 1)
}

func help(num int, f func()) chan struct{}{
   quit := make(chan struct{})
   j := make(chan int)

   var wg sync.WaitGroup

   // 创建子协程处理业务
   for i:=0;i<num;i  {
      wg.Add(1)
      go func(){
         defer wg.Done()

         _,ok:=<-j
         if !ok{
            fmt.Println("exit child goroutine .")
            return
         }
         // 子协程 正常执行业务
         f()
      }()
   }


   go func(){
      <-quit
      close(j)
      // 等待子协程全部退出
      wg.Wait()

      quit <- struct{}{}

   }()

   return quit
}


func main(){
   quit := help(10, fn)
   // 模拟主程序处理在处理其他事项
   // ...
   time.Sleep(time.Second * 10)

   quit <- struct{}{}

   // 此处等待所有子程序退出
   select{
   case <- quit:
      fmt.Println(" programs exit. ")
   }


}

上述程序执行结果如下,可以看到 help 函数创建了 10 个子协程,主协程主动通知子协程全部退出,退出的时候也是 10 个子协程退出了,主协程才退出

上述程序,如果某一个子协程出现了问题,导致子协程不能完全退出,也就是说某些子协程在 f 函数中阻塞住了,那么这个时候主协程岂不是一直无法退出???

那么此时,在主协程通知子协程退出的时候,我们加上一个超时时间,表达意思为,超过某个时间,如果子协程还没有全部退出完毕,那么主协程仍然主动关闭程序,可以这样写:

  1. 设定一个定时器, 3 秒后会触发,即可以从 t.C 中读取到数据
代码语言:javascript复制
t := time.NewTimer(time.Second * 3)
defer t.Stop()

// 此处等待所有子程序退出
select{
case <-t.C:
   fmt.Println("timeout programs exit. ")
case <- quit:
   fmt.Println(" 111 programs exit. ")
}

管道模式

说到管理,或许大家对 linux 里面的管道更加熟悉吧,例如使用 linux 命令找到文件中的 golang 这个字符串

代码语言:javascript复制
cat xxx.txt |grep "golang"

那么对于 GO 语言并发模式中的管道模式也是类似的效果,我们就可以用这个管道模式来过滤数据

例如我们可以设计这样一个程序,兄弟们可以动起手来写一写,评论区见哦:

  1. 整个程序总共使用 2 个通道
  2. help 函数中传输数据量 50 ,逻辑计算能够被 5 整除的数据写到第一个通道 ch1 中
  3. 另一个协程阻塞读取 ch1 中的内容,并将取出的数据乘以 3 ,将结果写入到 ch2 中
  4. 主协程就阻塞读取 ch2 的内容,读取到内容后,挨个打印出来

管道模式有两种模式,扇出模式 和 扇入模式,这个比较好理解

  1. 扇出模式:多种类型的数据从同一个通道 channel 中读取数据,直到通道关闭
  2. 扇入模式:输入的时候有多个通道channel,程序将所有的通道内数据汇聚,统一输入到另外一个通道channel A 里面,另外一个程序则从这个通道channel A 中读取数据,直到这个通道A关闭为止

超时模式和取消模式化

超时模式

上述例子中有专门说到如何去使用他,实际上我们还可以这样用:

代码语言:javascript复制
select{
case <- time.Afer(time.Second * 2):
   fmt.Println("timeout programs exit. ")
case <- quit:
   fmt.Println(" 111 programs exit. ")
}

取消模式

则是使用了 GO 语言的 context 包中的提供了上下文机制,可以在协程 goroutine 之间传递 deadline,取消等信号

我们使用的时候例如可以这样:

  1. 使用 context.WithCancel 创建一个可以被取消的上下文,启动一个协程 在 3 秒后关闭上下文
  2. 使用 for 循环模拟处理业务,默认会走 select 的 default 分支
  3. 3 秒后 走到 select 的 ctx.Done(),则进入到了取消模式,程序退出
代码语言:javascript复制
ctx, cancelFunc := context.WithCancel(context.Background())

go func() {
   time.Sleep(time.Second * 3)
   cancelFunc()
}()

for {
   select {
   case <-ctx.Done():
      fmt.Println("program exit .")
      return
   default:
      fmt.Println("I'm still here.")
      time.Sleep(time.Second)
   }
}

总的来说,今天分享了 GO 语言中常见的几种并发模式:创建模式,退出模式,管道模式,超时模式和取消模式,更多的,还是要我们要思考其原理和应用起来,学习他们才能更加的有效

0 人点赞