在说说channel哪些事-上篇,小编主要从channel是什么,为什么需要channel,channel基本的使用方法以及channel实现原理这个四个方面介绍了channel.本篇,小编将从实际的应用场景出发,介绍如何运用chnanel解决这些问题。channel解决的主要是并发问题,学习下面的场景,总结提炼用法,可以帮助我们更好的编写并发程序。
for select多路复用模式
for select多路复用模式非常常见,在实际的工作中也使用的非常频繁,其基本代码模式如下:
代码语言:javascript复制for {
select {
case 操作 channel one:
do something
case 操作 channel two:
do something
case ...:
do something
}
}
select后面跟多个case,每个case是一个channel操作,操作有两种,向channel里面发送数据,或者是从channel里面取数据,哪个case满足,就执行对应case后面的操作。如果同时有多个case满足,会随机选一个执行。下面举一个实例.exitCh和tasksCh是两个channel, 通过select同时监听两个case是否满足,exitCh是一个退出chan,如果收到通知,直接退出循环,tasksCh是任务chan,收到数据就进行处理。下面的select中也可以添加一个default分支,如果其他case不满足,会走到default分支。
代码语言:javascript复制exitCh = make(chan struct{})
tasksCh = make(chan interface{}, 8)
for {
select {
case <-exitCh:
return
case data := <-tasksCh:
process(data)
}
}
select timeout模式
select timeout模式也很常见,比如一个服务需要访问数据库或者向其他模块请求数据,因为处理的时间不能确定,不可能一直等待它们处理完成,等待太长用户体验不好,这时可以设置一个超时时间,如果超过这个时间数据库还没处理完,或者没有请求到数据,提前返回给用户提示。这种场景就可以使用select timeout模式。下面结合一个例子说明,select同时监听了taskCh和time.After,这个例子会输出处理超时了,也就是走到到time.After逻辑里面。因为taskCh通道中有数据会在5秒后,这里模拟一个耗时比较长的任务,time.After里面超时是1秒中,所以第二个case先满足。说明下,time.After返回的也是一个channel,类型为<-chan Time,可以在time/sleep.go中查看。在1秒时间到后,会往此channel里面发送一个数据。
代码语言:javascript复制func main() {
taskCh := make(chan int)
go func() {
//模拟长耗时任务
time.Sleep(5 * time.Second)
taskCh <- 1
}()
select {
case data := <-taskCh:
fmt.Println(data)
case <-time.After(1 * time.Second):
fmt.Println("处理超时了")
}
// 输出:处理超时了
}
pipeline流水线模式
pipeline模式也被称为流水线模式,模拟的就是现实生活中的流水线生产。拿一部电动汽车生产来说,有做底盘的,有做轮胎的,有做发动机的,有做车壳的,最后将经过组装,就变成了一部汽车。每第一道工序的输出是下一道工序的输出,下一道工序制作依赖上一道工序,整个加工的产品在工序之间传递。整个流程可以抽象成生产者消费者模型,如下图,对于工序2来来说,工序1是生产者,工序3是消费者,对于工序3来说,工序2是生产者。
理解了流水线模式,现在来实战下。
有3个编号为1,2,3的goroutine,每隔1秒有1个goroutine打印自己的编号。编写一个程序,让输出的编号信息按1,2,3,1,2,3,1,2,3...顺序打印。
每个编号的打印可以对应一道生产工序,编号之间是有顺序关系的,非常像流水线。定义一个channel,来传递令牌信息,谁取得令牌,就可以打印自己的编号,打印完成之后,休息1秒,将令牌传递给下一个goroutine. 上面的思考题有3个goroutine,定义1个含有3个元素的chan切片,每个goroutine从自己的chan中获取数据,获取到了就打印。实现代码如下:
代码语言:javascript复制const (
max = 3
)
func main() {
var chs []chan struct{}
chs = make([]chan struct{}, 0, max)
for i := 0; i < max; i {
chs = append(chs, make(chan struct{}))
}
for i := 0; i < max; i {
go func(chs []chan struct{}, i int) {
for {
<-chs[i]
fmt.Println(i 1)
time.Sleep(time.Second)
chs[(i 1)%max] <- struct{}{}
}
}(chs, i)
}
chs[0] <- struct{}{}
select {}
}
上面的实现逻辑整理的时序图如下图所示,结合下面图来看,处理逻辑非常清晰。
lock锁模式
channel的发送和接收之间存在着先后顺序关系,接收者能够接收到数据前,发送者已发送数据。channel是线程安全的,使用channel来实现互斥锁很容易,不需要担心线程之间的data race问题。如何实现呢?借助有缓冲区的channel, 申请一个有1个元素的channel。开始时先放入一个元素,这个元素代表锁,谁获取到了这个元素,相当于获取到了锁。我们可以封装一个锁数据结构,实现如下。
代码语言:javascript复制// 使用channel实现一个互斥锁
type myMutex struct {
ch chan struct{}
}
// MyMutex构造函数
func NewMyMutex() *myMutex {
mu := &myMutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 获取锁
func (m *myMutex) Lock() {
<-m.ch
}
// 释放锁
func (m *myMutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *myMutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
return false
}
}
// 锁是否未被使用
func (m *myMutex) IsLocked() bool {
return len(m.ch) == 0
}
// 实现带有超时功能,结合一个超时channel
func (m *myMutex) LockWithTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
return false
}
}
func main() {
myMutex := NewMyMutex()
myMutex.Lock()
fmt.Println("获取到了锁")
myMutex.Unlock()
if myMutex.IsLocked() {
fmt.Println("锁已被使用")
} else {
fmt.Println("锁未被使用")
}
myMutex.Lock()
if myMutex.IsLocked() {
fmt.Println("锁已被使用")
} else {
fmt.Println("锁未被使用")
}
myMutex.Unlock()
myMutex.Lock()
if !myMutex.LockWithTimeout(time.Second) {
fmt.Println("获取锁超时,稍后再试...")
}
myMutex.Unlock()
}
上面的代码不光实现锁基本的Lock和Unlock功能,还实现了TryLock和LockWithTimeout功能,利用select channel default/timer的功能,很容拓展出这些。上面的实现是从chan中获取到了元素表示获取到了锁,还是有另外一种实现方式,就是谁成功向chan中放入一个元素,表示谁就获取到了锁。实现方式与上面类型,小编这里给出一个实现参考。
代码语言:javascript复制// 使用channel实现一个互斥锁
type myMutex2 struct {
ch chan struct{}
}
// MyMutex构造函数
func NewMyMutex2() *myMutex2 {
mu := &myMutex2{make(chan struct{}, 1)}
return mu
}
// 获取锁
func (m *myMutex2) Lock() {
m.ch <- struct{}{}
}
// 释放锁
func (m *myMutex2) Unlock() {
select {
case <-m.ch:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *myMutex2) TryLock() bool {
select {
case m.ch <- struct{}{}:
return true
default:
return false
}
}
// 锁是否未被使用
func (m *myMutex2) IsLocked() bool {
return len(m.ch) == 1
}
// 实现带有超时功能,结合一个超时channel
func (m *myMutex2) LockWithTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case m.ch <- struct{}{}:
timer.Stop()
return true
case <-timer.C:
return false
}
}
fan-in扇入模式
扇入模式可以通俗叫做merge模式,就是有多路输入,合并到一路输出。在计算机领域,模块的扇入是指有多少个上级模块调用它。对于本文来说,是说有多个channel输入,一个channel输出。如果将输入理解成生产者,输出理解成消费者,那扇入模式可以理解成多个生产者 1个消费者模型。结合下面的代码做一个分析,FanInPattern定义了扇入处理模式,这个函数入参是一个 chan int切片,即多个int型channel, 输出是一个chan int类型,对应上面介绍的多个输入channel和1个输出channel. 内部是如何实现的呢?定义了一个procees函数模拟处理任务,这里是将输入channel中的数据读取出来发送给输出channel.每个任务开启了一个goroutine单独处理的,也就是说输入channel的数据处理是并发的,它们之间互不影响,效率非常高。
代码语言:javascript复制func FanInPattern(chs ...<-chan int) <-chan int {
var wg sync.WaitGroup
outCh := make(chan int)
process := func(ch <-chan int) {
defer wg.Done()
for data := range ch {
outCh <- data
}
}
wg.Add(len(chs))
for _, ch := range chs {
go process(ch)
}
go func() {
wg.Wait()
close(outCh)
}()
return outCh
}
fan-out扇出模式
有了上面扇入介绍,很容易理解扇出模式。扇出模式可以看作是扇入模式的逆过程,扇入是多个输入channel对应到一个输出channel,那扇出模式就是一个输入channel对应到多个输出channle.从生产者消费者角度理解,扇出模式是单生产者 多消费者模型。可以将扇出模式通俗理解为split模式。下面给出一个扇出模式示例。此示例将输入chan中的每个数据都会分发到4个输出chan上。分发的时候采用goroutine处理,每个process是独立的。外部函数可以从输出的多路chan上获取相应的数据。
代码语言:javascript复制// 扇出模式,将ch中的每个数据发送给所有的out chan, 发送时采用非阻塞处理
// FanOutPattern内部启动了一个goroutine处理,不会阻塞调用方
func FanOutPattern(ch <-chan int, out []chan int) {
go func() {
var wg sync.WaitGroup
defer func() {
wg.Wait()
for i := 0; i < len(out); i {
close(out[i])
}
}()
for data := range ch {
vd := data
wg.Add(len(out))
for i := 0; i < len(out); i {
go func(i int) {
defer wg.Done()
out[i] <- vd
}(i)
}
}
}()
}
func main() {
inCh := make(chan int, 16)
outChs := []chan int{make(chan int), make(chan int), make(chan int), make(chan int)}
for i := 0; i < 16; i {
inCh <- i
}
close(inCh)
FanOutPattern(inCh, outChs)
for _, outCh := range outChs {
go func(outCh <-chan int) {
for data := range outCh {
fmt.Println(data)
}
}(outCh)
}
time.Sleep(time.Second * 10)
}
stream流模式
chan可以做一个管道,向chan中发送数据,然后从chan取走数据可以看成数据在chan这个管道中像水从管道中流过。在流过chan的时候,我们可以做一些过滤。下面这个例子对流过chan的数据做奇偶校验,只保留偶数,奇数将会被过滤掉。in和outCh分别为输入和输出chan,从in中获取元素,判断其类型,如果是偶数,放入输出outCh chan中。调用方从输出chan中获取偶数数据。当然实际工作用像这么简单的处理,完全可用不chan来做,直接用循环处理即可。这里只是举例说明流处理这种模式。
代码语言:javascript复制func StreamPattern(in <-chan int) <-chan int {
outCh := make(chan int)
go func() {
defer close(outCh)
for data := range in {
if data%2 == 0 {
outCh <- data
}
}
}()
return outCh
}
func main() {
inCh := make(chan int, 8)
for i := 1; i <= 8; i {
inCh <- i
}
close(inCh)
outCh := StreamPattern(inCh)
for data := range outCh {
fmt.Println(data)
}
}
map-reduce模型
map-reduce是一种分布式数据处理框架,前几年非常火的hadoop, 就是mpa-reduce的一种实现。map-reduce处理流程大体分为两个步骤,正如这个英文单词一样,分为map和reduce两个阶段。hadoop实现中还有其他的一些阶段,比如shuffle阶段,这里不详细展开说明,感兴趣的读者可以阅读GFS论文等资料。下面的代码完成map-reduce版的词频统计功能,采用chan实现,在多核cpu下能够发挥并行优势,提升处理效率。
代码语言:javascript复制// 实现map操作,对于in chan中数%4进行分组,余数相同的会分到一组
// 每一组对应一个输出chan,对应分组的数据会输出到对应的chan中
func mapChan(in <-chan int) []chan int {
var wg sync.WaitGroup
outChs := []chan int{make(chan int), make(chan int), make(chan int), make(chan int)}
go func() {
defer func() {
wg.Wait()
for i := 0; i < len(outChs); i {
close(outChs[i])
}
}()
for data := range in {
dv := data
wg.Add(1)
go func() {
defer wg.Done()
outChs[dv%len(outChs)] <- dv
}()
}
}()
return outChs
}
// 实现reduce操作,对chs中每个chan开启一个goroutine并行统计统计每次
// 值出现的次数,统计完成之后输出到输出chan merge中
func reduce(chs []chan int) <-chan map[int]int {
var wg sync.WaitGroup
merge := make(chan map[int]int)
go func() {
wg.Add(len(chs))
defer func() {
wg.Wait()
close(merge)
}()
for _, ch := range chs {
go func(ch <-chan int) {
defer wg.Done()
m := make(map[int]int)
for data := range ch {
m[data] = 1
}
merge <- m
}(ch)
}
}()
return merge
}
func main() {
data := make([]int, 0, 10)
for i := 0; i < 10; i {
data = append(data, rand.Intn(5))
}
fmt.Println("data values ", data)
ch := make(chan int)
go func() {
for _, dv := range data {
ch <- dv
}
close(ch)
}()
outCh := reduce(mapChan(ch))
for data := range outCh {
for k, v := range data {
fmt.Println("data", k, "appear", v)
}
}
}
通过反射操作channel
操作少量的chan可以使用select,前面的例子都是使用select来操作的。但是如果要操作的chan有很多,一个一个写select太恐怖了。这种情况可以采用reflect.Select函数,可以将一组运行时的chan传入,当作参数执行.select 是伪随机的,在执行的 case 中随机选择一个 case,并把选择的这个 case 的索引(第一个返回值)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值, 这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。下面的例子同时操作3个chan,采用的是reflect.Select,传入3个reflect.SelectCase, 返回给我们选中了哪个chan.
代码语言:javascript复制func main() {
c := make(chan int, 1)
vc := reflect.ValueOf(c)
succeeded := vc.TrySend(reflect.ValueOf(888))
fmt.Println(succeeded, vc.Len(), vc.Cap())
vSend, vZero := reflect.ValueOf(888), reflect.Value{}
branches := []reflect.SelectCase{
{Dir: reflect.SelectDefault, Chan: vZero, Send: vZero},
{Dir: reflect.SelectRecv, Chan: vc, Send: vZero},
{Dir: reflect.SelectSend, Chan: vc, Send: vSend},
}
// 使用reflect.Select处理很多种chan的情况
selIndex, vRecv, sentBeforeClosed := reflect.Select(branches)
fmt.Println(selIndex)
fmt.Println(sentBeforeClosed)
fmt.Println(vRecv.Int())
vc.Close()
}
总结
上面列举了chan常见的应用模式,归纳起来是这几种场景。1是任务编排,让一组 goroutine 按照一定的顺序并发或者串行的执行,2是做信号通知,一个 goroutine 可以将信号chan已有数据,chan已关闭等传递给另一 个或者另一组goroutine,3是数据传递,我们使用最多的也是这种形式,通过chan来达到共享内存的目的。4是锁功能,利用 Channel 也可以实现互斥锁的机制。5是数据的“分-合”,当作并发的queue使用,解决生产者和消费者问题,多个 goroutine 可以并发当作生产者和消费者。
Channel in Golang[1]总结了才知道 channel有这么多用法[2]
Reference
[1]
Channel in Golang: https://www.geeksforgeeks.org/channel-in-golang/
[2]
总结了才知道 channel有这么多用法: https://segmentfault.com/a/1190000017958702