说说channel哪些事-下篇

2022-08-15 14:37:59 浏览数 (1)

在说说channel哪些事-上篇,小编主要从channel是什么,为什么需要channel,channel基本的使用方法以及channel实现原理这个四个方面介绍了channel.本篇,小编将从实际的应用场景出发,介绍如何运用chnanel解决这些问题。channel解决的主要是并发问题,学习下面的场景,总结提炼用法,可以帮助我们更好的编写并发程序。

for select多路复用模式

for select多路复用模式非常常见,在实际的工作中也使用的非常频繁,其基本代码模式如下:

代码语言:javascript复制
for {
  select {
  case 操作 channel one:
   do something
  case 操作 channel two:
   do something
  case ...:
   do something
  }
 }

select后面跟多个case,每个case是一个channel操作,操作有两种,向channel里面发送数据,或者是从channel里面取数据,哪个case满足,就执行对应case后面的操作。如果同时有多个case满足,会随机选一个执行。下面举一个实例.exitCh和tasksCh是两个channel, 通过select同时监听两个case是否满足,exitCh是一个退出chan,如果收到通知,直接退出循环,tasksCh是任务chan,收到数据就进行处理。下面的select中也可以添加一个default分支,如果其他case不满足,会走到default分支。

代码语言:javascript复制
exitCh = make(chan struct{})
tasksCh = make(chan interface{}, 8)

for {
  select {
  case <-exitCh:
        return
  case data := <-tasksCh:
        process(data)
      }
}

select timeout模式

select timeout模式也很常见,比如一个服务需要访问数据库或者向其他模块请求数据,因为处理的时间不能确定,不可能一直等待它们处理完成,等待太长用户体验不好,这时可以设置一个超时时间,如果超过这个时间数据库还没处理完,或者没有请求到数据,提前返回给用户提示。这种场景就可以使用select timeout模式。下面结合一个例子说明,select同时监听了taskCh和time.After,这个例子会输出处理超时了,也就是走到到time.After逻辑里面。因为taskCh通道中有数据会在5秒后,这里模拟一个耗时比较长的任务,time.After里面超时是1秒中,所以第二个case先满足。说明下,time.After返回的也是一个channel,类型为<-chan Time,可以在time/sleep.go中查看。在1秒时间到后,会往此channel里面发送一个数据。

代码语言:javascript复制
func main() {
 taskCh := make(chan int)
 go func() {
  //模拟长耗时任务
  time.Sleep(5 * time.Second)
  taskCh <- 1
 }()

 select {
 case data := <-taskCh:
  fmt.Println(data)
 case <-time.After(1 * time.Second):
  fmt.Println("处理超时了")
 }
 // 输出:处理超时了
}

pipeline流水线模式

pipeline模式也被称为流水线模式,模拟的就是现实生活中的流水线生产。拿一部电动汽车生产来说,有做底盘的,有做轮胎的,有做发动机的,有做车壳的,最后将经过组装,就变成了一部汽车。每第一道工序的输出是下一道工序的输出,下一道工序制作依赖上一道工序,整个加工的产品在工序之间传递。整个流程可以抽象成生产者消费者模型,如下图,对于工序2来来说,工序1是生产者,工序3是消费者,对于工序3来说,工序2是生产者。

理解了流水线模式,现在来实战下。

有3个编号为1,2,3的goroutine,每隔1秒有1个goroutine打印自己的编号。编写一个程序,让输出的编号信息按1,2,3,1,2,3,1,2,3...顺序打印。

每个编号的打印可以对应一道生产工序,编号之间是有顺序关系的,非常像流水线。定义一个channel,来传递令牌信息,谁取得令牌,就可以打印自己的编号,打印完成之后,休息1秒,将令牌传递给下一个goroutine. 上面的思考题有3个goroutine,定义1个含有3个元素的chan切片,每个goroutine从自己的chan中获取数据,获取到了就打印。实现代码如下:

代码语言:javascript复制
const (
 max = 3
)

func main() {
 var chs []chan struct{}
 chs = make([]chan struct{}, 0, max)

 for i := 0; i < max; i   {
  chs = append(chs, make(chan struct{}))
 }

 for i := 0; i < max; i   {
  go func(chs []chan struct{}, i int) {
   for {
    <-chs[i]
    fmt.Println(i   1)
    time.Sleep(time.Second)
    chs[(i 1)%max] <- struct{}{}
   }
  }(chs, i)
 }

 chs[0] <- struct{}{}

 select {}
}

上面的实现逻辑整理的时序图如下图所示,结合下面图来看,处理逻辑非常清晰。

lock锁模式

channel的发送和接收之间存在着先后顺序关系,接收者能够接收到数据前,发送者已发送数据。channel是线程安全的,使用channel来实现互斥锁很容易,不需要担心线程之间的data race问题。如何实现呢?借助有缓冲区的channel, 申请一个有1个元素的channel。开始时先放入一个元素,这个元素代表锁,谁获取到了这个元素,相当于获取到了锁。我们可以封装一个锁数据结构,实现如下。

代码语言:javascript复制
// 使用channel实现一个互斥锁
type myMutex struct {
 ch chan struct{}
}

// MyMutex构造函数
func NewMyMutex() *myMutex {
 mu := &myMutex{make(chan struct{}, 1)}
 mu.ch <- struct{}{}
 return mu
}

// 获取锁
func (m *myMutex) Lock() {
 <-m.ch
}

// 释放锁
func (m *myMutex) Unlock() {
 select {
 case m.ch <- struct{}{}:
 default:
  panic("unlock of unlocked mutex")
 }
}

// 尝试获取锁
func (m *myMutex) TryLock() bool {
 select {
 case <-m.ch:
  return true
 default:
  return false
 }
}

// 锁是否未被使用
func (m *myMutex) IsLocked() bool {
 return len(m.ch) == 0
}

// 实现带有超时功能,结合一个超时channel
func (m *myMutex) LockWithTimeout(timeout time.Duration) bool {
 timer := time.NewTimer(timeout)
 select {
 case <-m.ch:
  timer.Stop()
  return true
 case <-timer.C:
  return false
 }
}

func main() {
 myMutex := NewMyMutex()

 myMutex.Lock()
 fmt.Println("获取到了锁")
 myMutex.Unlock()

 if myMutex.IsLocked() {
  fmt.Println("锁已被使用")
 } else {
  fmt.Println("锁未被使用")
 }

 myMutex.Lock()
 if myMutex.IsLocked() {
  fmt.Println("锁已被使用")
 } else {
  fmt.Println("锁未被使用")
 }
 myMutex.Unlock()

 myMutex.Lock()
 if !myMutex.LockWithTimeout(time.Second) {
  fmt.Println("获取锁超时,稍后再试...")
 }
 myMutex.Unlock()
}

上面的代码不光实现锁基本的Lock和Unlock功能,还实现了TryLock和LockWithTimeout功能,利用select channel default/timer的功能,很容拓展出这些。上面的实现是从chan中获取到了元素表示获取到了锁,还是有另外一种实现方式,就是谁成功向chan中放入一个元素,表示谁就获取到了锁。实现方式与上面类型,小编这里给出一个实现参考。

代码语言:javascript复制
// 使用channel实现一个互斥锁
type myMutex2 struct {
 ch chan struct{}
}

// MyMutex构造函数
func NewMyMutex2() *myMutex2 {
 mu := &myMutex2{make(chan struct{}, 1)}
 return mu
}

// 获取锁
func (m *myMutex2) Lock() {
 m.ch <- struct{}{}
}

// 释放锁
func (m *myMutex2) Unlock() {
 select {
 case <-m.ch:
 default:
  panic("unlock of unlocked mutex")
 }
}

// 尝试获取锁
func (m *myMutex2) TryLock() bool {
 select {
 case m.ch <- struct{}{}:
  return true
 default:
  return false
 }
}

// 锁是否未被使用
func (m *myMutex2) IsLocked() bool {
 return len(m.ch) == 1
}

// 实现带有超时功能,结合一个超时channel
func (m *myMutex2) LockWithTimeout(timeout time.Duration) bool {
 timer := time.NewTimer(timeout)
 select {
 case m.ch <- struct{}{}:
  timer.Stop()
  return true
 case <-timer.C:
  return false
 }
}

fan-in扇入模式

扇入模式可以通俗叫做merge模式,就是有多路输入,合并到一路输出。在计算机领域,模块的扇入是指有多少个上级模块调用它。对于本文来说,是说有多个channel输入,一个channel输出。如果将输入理解成生产者,输出理解成消费者,那扇入模式可以理解成多个生产者 1个消费者模型。结合下面的代码做一个分析,FanInPattern定义了扇入处理模式,这个函数入参是一个 chan int切片,即多个int型channel, 输出是一个chan int类型,对应上面介绍的多个输入channel和1个输出channel. 内部是如何实现的呢?定义了一个procees函数模拟处理任务,这里是将输入channel中的数据读取出来发送给输出channel.每个任务开启了一个goroutine单独处理的,也就是说输入channel的数据处理是并发的,它们之间互不影响,效率非常高。

代码语言:javascript复制
func FanInPattern(chs ...<-chan int) <-chan int {
 var wg sync.WaitGroup

 outCh := make(chan int)
 process := func(ch <-chan int) {
  defer wg.Done()
  for data := range ch {
   outCh <- data
  }
 }

 wg.Add(len(chs))
 for _, ch := range chs {
  go process(ch)
 }

 go func() {
  wg.Wait()
  close(outCh)
 }()

 return outCh
}

fan-out扇出模式

有了上面扇入介绍,很容易理解扇出模式。扇出模式可以看作是扇入模式的逆过程,扇入是多个输入channel对应到一个输出channel,那扇出模式就是一个输入channel对应到多个输出channle.从生产者消费者角度理解,扇出模式是单生产者 多消费者模型。可以将扇出模式通俗理解为split模式。下面给出一个扇出模式示例。此示例将输入chan中的每个数据都会分发到4个输出chan上。分发的时候采用goroutine处理,每个process是独立的。外部函数可以从输出的多路chan上获取相应的数据。

代码语言:javascript复制
// 扇出模式,将ch中的每个数据发送给所有的out chan, 发送时采用非阻塞处理
// FanOutPattern内部启动了一个goroutine处理,不会阻塞调用方
func FanOutPattern(ch <-chan int, out []chan int) {
 go func() {
  var wg sync.WaitGroup

  defer func() {
   wg.Wait()
   for i := 0; i < len(out); i   {
    close(out[i])
   }
  }()
  for data := range ch {
   vd := data
   wg.Add(len(out))
   for i := 0; i < len(out); i   {
    go func(i int) {
     defer wg.Done()
     out[i] <- vd
    }(i)
   }
  }
 }()
}

func main() {
 inCh := make(chan int, 16)
 outChs := []chan int{make(chan int), make(chan int), make(chan int), make(chan int)}

 for i := 0; i < 16; i   {
  inCh <- i
 }
 close(inCh)

 FanOutPattern(inCh, outChs)

 for _, outCh := range outChs {
  go func(outCh <-chan int) {
   for data := range outCh {
    fmt.Println(data)
   }
  }(outCh)
 }

 time.Sleep(time.Second * 10)
}

stream流模式

chan可以做一个管道,向chan中发送数据,然后从chan取走数据可以看成数据在chan这个管道中像水从管道中流过。在流过chan的时候,我们可以做一些过滤。下面这个例子对流过chan的数据做奇偶校验,只保留偶数,奇数将会被过滤掉。in和outCh分别为输入和输出chan,从in中获取元素,判断其类型,如果是偶数,放入输出outCh chan中。调用方从输出chan中获取偶数数据。当然实际工作用像这么简单的处理,完全可用不chan来做,直接用循环处理即可。这里只是举例说明流处理这种模式。

代码语言:javascript复制
func StreamPattern(in <-chan int) <-chan int {
 outCh := make(chan int)

 go func() {
  defer close(outCh)
  for data := range in {
   if data%2 == 0 {
    outCh <- data
   }
  }
 }()
 return outCh
}

func main() {
 inCh := make(chan int, 8)
 for i := 1; i <= 8; i   {
  inCh <- i
 }
 close(inCh)

 outCh := StreamPattern(inCh)
 for data := range outCh {
  fmt.Println(data)
 }
}

map-reduce模型

map-reduce是一种分布式数据处理框架,前几年非常火的hadoop, 就是mpa-reduce的一种实现。map-reduce处理流程大体分为两个步骤,正如这个英文单词一样,分为map和reduce两个阶段。hadoop实现中还有其他的一些阶段,比如shuffle阶段,这里不详细展开说明,感兴趣的读者可以阅读GFS论文等资料。下面的代码完成map-reduce版的词频统计功能,采用chan实现,在多核cpu下能够发挥并行优势,提升处理效率。

代码语言:javascript复制
// 实现map操作,对于in chan中数%4进行分组,余数相同的会分到一组
// 每一组对应一个输出chan,对应分组的数据会输出到对应的chan中
func mapChan(in <-chan int) []chan int {
 var wg sync.WaitGroup
 outChs := []chan int{make(chan int), make(chan int), make(chan int), make(chan int)}

 go func() {
  defer func() {
   wg.Wait()
   for i := 0; i < len(outChs); i   {
    close(outChs[i])
   }

  }()

  for data := range in {
   dv := data
   wg.Add(1)
   go func() {
    defer wg.Done()
    outChs[dv%len(outChs)] <- dv
   }()
  }
 }()

 return outChs
}

// 实现reduce操作,对chs中每个chan开启一个goroutine并行统计统计每次
// 值出现的次数,统计完成之后输出到输出chan merge中
func reduce(chs []chan int) <-chan map[int]int {
 var wg sync.WaitGroup

 merge := make(chan map[int]int)

 go func() {
  wg.Add(len(chs))
  defer func() {
   wg.Wait()
   close(merge)
  }()

  for _, ch := range chs {
   go func(ch <-chan int) {
    defer wg.Done()
    m := make(map[int]int)
    for data := range ch {
     m[data]  = 1
    }
    merge <- m
   }(ch)
  }
 }()

 return merge
}

func main() {
 data := make([]int, 0, 10)
 for i := 0; i < 10; i   {
  data = append(data, rand.Intn(5))
 }
 fmt.Println("data values ", data)
 ch := make(chan int)
 go func() {
  for _, dv := range data {
   ch <- dv
  }
  close(ch)
 }()

 outCh := reduce(mapChan(ch))

 for data := range outCh {
  for k, v := range data {
   fmt.Println("data", k, "appear", v)
  }
 }
}

通过反射操作channel

操作少量的chan可以使用select,前面的例子都是使用select来操作的。但是如果要操作的chan有很多,一个一个写select太恐怖了。这种情况可以采用reflect.Select函数,可以将一组运行时的chan传入,当作参数执行.select 是伪随机的,在执行的 case 中随机选择一个 case,并把选择的这个 case 的索引(第一个返回值)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值, 这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。下面的例子同时操作3个chan,采用的是reflect.Select,传入3个reflect.SelectCase, 返回给我们选中了哪个chan.

代码语言:javascript复制
func main() {
 c := make(chan int, 1)
 vc := reflect.ValueOf(c)
 succeeded := vc.TrySend(reflect.ValueOf(888))
 fmt.Println(succeeded, vc.Len(), vc.Cap())

 vSend, vZero := reflect.ValueOf(888), reflect.Value{}
 branches := []reflect.SelectCase{
  {Dir: reflect.SelectDefault, Chan: vZero, Send: vZero},
  {Dir: reflect.SelectRecv, Chan: vc, Send: vZero},
  {Dir: reflect.SelectSend, Chan: vc, Send: vSend},
 }

 // 使用reflect.Select处理很多种chan的情况
 selIndex, vRecv, sentBeforeClosed := reflect.Select(branches)
 fmt.Println(selIndex)
 fmt.Println(sentBeforeClosed)
 fmt.Println(vRecv.Int())
 vc.Close()

}

总结

上面列举了chan常见的应用模式,归纳起来是这几种场景。1是任务编排,让一组 goroutine 按照一定的顺序并发或者串行的执行,2是做信号通知,一个 goroutine 可以将信号chan已有数据,chan已关闭等传递给另一 个或者另一组goroutine,3是数据传递,我们使用最多的也是这种形式,通过chan来达到共享内存的目的。4是锁功能,利用 Channel 也可以实现互斥锁的机制。5是数据的“分-合”,当作并发的queue使用,解决生产者和消费者问题,多个 goroutine 可以并发当作生产者和消费者。

Channel in Golang[1]总结了才知道 channel有这么多用法[2]

Reference

[1]

Channel in Golang: https://www.geeksforgeeks.org/channel-in-golang/

[2]

总结了才知道 channel有这么多用法: https://segmentfault.com/a/1190000017958702

0 人点赞